You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:16 UTC

[25/47] flink git commit: [FLINK-2793] [runtime-web] Redirect to leader in non-standalone mode

[FLINK-2793] [runtime-web] Redirect to leader in non-standalone mode

Squashes:
5a88d5e [tests] Add HttpTestClient for testing HTTP responses
656d6d6 Split WebMonitor and LeaderRetrievalService start up
a7e8da8 Move generated /web files to src/main/resources

Add comment to webMonitorPort attribute and make line breaks more Scalaesque

Don't block on leader retrieval and only resolve associated job manager once

Make JobManagerRetriever independent of redirecting logic

This closes #1202.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77fc0cc4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77fc0cc4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77fc0cc4

Branch: refs/heads/master
Commit: 77fc0cc445e14eeef1952e0760f80912351574bd
Parents: d18f580
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Sep 25 11:44:53 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:52 2015 +0200

----------------------------------------------------------------------
 flink-runtime-web/README.md                     |     2 +-
 flink-runtime-web/pom.xml                       |    18 +
 .../webmonitor/ExecutionGraphHolder.java        |    21 +-
 .../webmonitor/JobManagerArchiveRetriever.java  |   111 -
 .../runtime/webmonitor/JobManagerRetriever.java |   189 +
 .../webmonitor/RuntimeMonitorHandler.java       |    69 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |    60 +-
 .../files/StaticFileServerHandler.java          |   102 +-
 .../AbstractExecutionGraphRequestHandler.java   |     9 +-
 .../handlers/ClusterOverviewHandler.java        |    20 +-
 .../handlers/CurrentJobIdsHandler.java          |    10 +-
 .../handlers/CurrentJobsOverviewHandler.java    |    24 +-
 .../handlers/DashboardConfigHandler.java        |     3 +-
 .../handlers/HandlerRedirectUtils.java          |   105 +
 .../handlers/JobManagerConfigHandler.java       |     3 +-
 .../webmonitor/handlers/RequestHandler.java     |     8 +-
 .../handlers/TaskManagersHandler.java           |    22 +-
 .../src/main/resources/web/css/index.css        |   596 +
 .../src/main/resources/web/css/vendor.css       |  9183 ++
 .../main/resources/web/fonts/FontAwesome.otf    |   Bin 0 -> 93888 bytes
 .../resources/web/fonts/fontawesome-webfont.eot |   Bin 0 -> 60767 bytes
 .../resources/web/fonts/fontawesome-webfont.svg |   565 +
 .../resources/web/fonts/fontawesome-webfont.ttf |   Bin 0 -> 122092 bytes
 .../web/fonts/fontawesome-webfont.woff          |   Bin 0 -> 71508 bytes
 .../web/fonts/fontawesome-webfont.woff2         |   Bin 0 -> 56780 bytes
 .../main/resources/web/images/flink-logo.png    |   Bin 0 -> 6096 bytes
 .../src/main/resources/web/index.html           |    55 +
 .../src/main/resources/web/js/index.js          |  1420 +
 .../src/main/resources/web/js/index.js.orig     |  1150 +
 .../src/main/resources/web/js/vendor.js         | 81087 +++++++++++++++++
 .../web/partials/jobmanager/config.html         |    33 +
 .../web/partials/jobmanager/index.html          |    33 +
 .../web/partials/jobmanager/stdout.html         |    40 +
 .../web/partials/jobs/completed-jobs.html       |    53 +
 .../resources/web/partials/jobs/job.config.html |    57 +
 .../web/partials/jobs/job.exceptions.html       |    38 +
 .../main/resources/web/partials/jobs/job.html   |    48 +
 .../resources/web/partials/jobs/job.plan.html   |    31 +
 .../jobs/job.plan.node-list.accumulators.html   |    40 +
 .../jobs/job.plan.node-list.overview.html       |    60 +
 .../jobs/job.plan.node.accumulators.html        |    68 +
 .../partials/jobs/job.plan.node.subtasks.html   |    52 +
 .../web/partials/jobs/job.properties.html       |   140 +
 .../web/partials/jobs/job.statistics.html       |    40 +
 .../web/partials/jobs/job.timeline.html         |    23 +
 .../web/partials/jobs/job.timeline.vertex.html  |    30 +
 .../web/partials/jobs/running-jobs.html         |    53 +
 .../main/resources/web/partials/overview.html   |   147 +
 .../webmonitor/WebRuntimeMonitorITCase.java     |   329 +
 .../webmonitor/testutils/HttpTestClient.java    |   309 +
 flink-runtime-web/web-dashboard/gulpfile.js     |     4 +-
 .../web-dashboard/web/css/index.css             |   596 -
 .../web-dashboard/web/css/vendor.css            |  9183 --
 .../web-dashboard/web/fonts/FontAwesome.otf     |   Bin 93888 -> 0 bytes
 .../web/fonts/fontawesome-webfont.eot           |   Bin 60767 -> 0 bytes
 .../web/fonts/fontawesome-webfont.svg           |   565 -
 .../web/fonts/fontawesome-webfont.ttf           |   Bin 122092 -> 0 bytes
 .../web/fonts/fontawesome-webfont.woff          |   Bin 71508 -> 0 bytes
 .../web/fonts/fontawesome-webfont.woff2         |   Bin 56780 -> 0 bytes
 .../web-dashboard/web/images/flink-logo.png     |   Bin 6096 -> 0 bytes
 flink-runtime-web/web-dashboard/web/index.html  |    55 -
 flink-runtime-web/web-dashboard/web/js/index.js |  1420 -
 .../web-dashboard/web/js/vendor.js              | 81087 -----------------
 .../web/partials/jobmanager/config.html         |    33 -
 .../web/partials/jobmanager/index.html          |    33 -
 .../web/partials/jobmanager/stdout.html         |    40 -
 .../web/partials/jobs/completed-jobs.html       |    53 -
 .../web/partials/jobs/job.config.html           |    57 -
 .../web/partials/jobs/job.exceptions.html       |    38 -
 .../web-dashboard/web/partials/jobs/job.html    |    48 -
 .../web/partials/jobs/job.plan.html             |    31 -
 .../jobs/job.plan.node-list.accumulators.html   |    40 -
 .../jobs/job.plan.node-list.overview.html       |    60 -
 .../jobs/job.plan.node.accumulators.html        |    68 -
 .../partials/jobs/job.plan.node.subtasks.html   |    52 -
 .../web/partials/jobs/job.properties.html       |   140 -
 .../web/partials/jobs/job.statistics.html       |    40 -
 .../web/partials/jobs/job.timeline.html         |    23 -
 .../web/partials/jobs/job.timeline.vertex.html  |    30 -
 .../web/partials/jobs/running-jobs.html         |    53 -
 .../web-dashboard/web/partials/overview.html    |   147 -
 .../web/partials/taskmanager/index.html         |    57 -
 .../runtime/instance/AkkaActorGateway.java      |     5 +
 .../flink/runtime/util/StandaloneUtils.java     |     2 +
 .../flink/runtime/webmonitor/WebMonitor.java    |     2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |    97 +-
 .../runtime/messages/JobManagerMessages.scala   |    14 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |     2 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |     1 +
 pom.xml                                         |     2 +-
 90 files changed, 96305 insertions(+), 94229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/README.md
----------------------------------------------------------------------
diff --git a/flink-runtime-web/README.md b/flink-runtime-web/README.md
index 9664681..b724163 100644
--- a/flink-runtime-web/README.md
+++ b/flink-runtime-web/README.md
@@ -93,7 +93,7 @@ bower install
 gulp
 ```
 
-The dashboard code is under `/app`. The result of the build process is under `/web`.
+The dashboard code is under `/app`. The result of the build process is under `/web` in the src/main/resources folder.
 
 When building Flink with Maven (in particular the `flink-dist` project), the generated
 files are copied into the build target, to the folder `resources/web-runtime-monitor`.

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index ffb68bc..727604f 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -103,6 +103,24 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
+		<!-- ===================================================
+								Testing
+			=================================================== -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 09ede4c..f680306 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -31,6 +31,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.WeakHashMap;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
  * <p>
@@ -43,23 +45,16 @@ public class ExecutionGraphHolder {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
 
-	/** Retrieves the current leading JobManager and its corresponding archive */
-	private final JobManagerArchiveRetriever retriever;
-
 	private final FiniteDuration timeout;
 
 	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
 
-	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
-		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	public ExecutionGraphHolder() {
+		this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
-		if (retriever == null || timeout == null) {
-			throw new NullPointerException();
-		}
-		this.retriever = retriever;
-		this.timeout = timeout;
+	public ExecutionGraphHolder(FiniteDuration timeout) {
+		this.timeout = checkNotNull(timeout);
 	}
 
 	/**
@@ -68,15 +63,13 @@ public class ExecutionGraphHolder {
 	 * @param jid jobID of the execution graph to be retrieved
 	 * @return the retrieved execution graph or null if it is not retrievable
 	 */
-	public ExecutionGraph getExecutionGraph(JobID jid) {
+	public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) {
 		ExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			return cached;
 		}
 
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
 				Object result = Await.result(future, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
deleted file mode 100644
index 91c9ad5..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-/**
- * Retrieves and stores the actor gateway to the current leading JobManager and its archive. In
- * case of an error, the {@link WebRuntimeMonitor} to which this instance is associated will be
- * stopped.
- */
-public class JobManagerArchiveRetriever implements LeaderRetrievalListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JobManagerArchiveRetriever.class);
-
-	private final ActorSystem actorSystem;
-	private final FiniteDuration lookupTimeout;
-	private final FiniteDuration timeout;
-	private final WebMonitor webMonitor;
-
-	/** will be written and read concurrently */
-	private volatile ActorGateway jobManagerGateway;
-	private volatile ActorGateway archiveGateway;
-
-	public JobManagerArchiveRetriever(
-			WebMonitor webMonitor,
-			ActorSystem actorSystem,
-			FiniteDuration lookupTimeout,
-			FiniteDuration timeout) {
-		this.webMonitor = webMonitor;
-		this.actorSystem = actorSystem;
-		this.lookupTimeout = lookupTimeout;
-		this.timeout = timeout;
-	}
-
-	public ActorGateway getJobManagerGateway() {
-		return jobManagerGateway;
-	}
-
-	public ActorGateway getArchiveGateway() {
-		return archiveGateway;
-	}
-
-
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-		if (leaderAddress != null && !leaderAddress.equals("")) {
-			try {
-				ActorRef jobManager = AkkaUtils.getActorRef(
-						leaderAddress,
-						actorSystem,
-						lookupTimeout);
-				jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
-				Future<Object> archiveFuture = jobManagerGateway.ask(
-						JobManagerMessages.getRequestArchive(),
-						timeout);
-
-				ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
-						archiveFuture,
-						timeout)
-				).actor();
-
-				archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
-			} catch (Exception e) {
-				handleError(e);
-			}
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Received error from LeaderRetrievalService.", exception);
-
-		try{
-			// stop associated webMonitor
-			webMonitor.stop();
-		} catch (Exception e) {
-			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
new file mode 100644
index 0000000..7162639
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.ResponseWebMonitorPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Retrieves and stores the actor gateway to the current leading JobManager. In case of an error,
+ * the {@link WebRuntimeMonitor} to which this instance is associated will be stopped.
+ *
+ * <p>The job manager gateway only works if the web monitor and the job manager run in the same
+ * actor system, because many execution graph structures are not serializable. This breaks the nice
+ * leader retrieval abstraction and we have a special code path in case that another job manager is
+ * leader (see {@link org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils}. In such a
+ * case, we get the address of the web monitor of the leading job manager and redirect to it
+ * (instead of directly communicating with it).
+ */
+public class JobManagerRetriever implements LeaderRetrievalListener {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
+
+	private final Object lock = new Object();
+
+	private final WebMonitor webMonitor;
+	private final ActorSystem actorSystem;
+	private final FiniteDuration lookupTimeout;
+	private final FiniteDuration timeout;
+
+	private volatile Tuple2<Promise<ActorGateway>, Promise<Integer>> leaderPromise =
+			new Tuple2<Promise<ActorGateway>, Promise<Integer>>(
+					new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>(),
+					new scala.concurrent.impl.Promise.DefaultPromise<Integer>());
+
+	public JobManagerRetriever(
+			WebMonitor webMonitor,
+			ActorSystem actorSystem,
+			FiniteDuration lookupTimeout,
+			FiniteDuration timeout) {
+
+		this.webMonitor = checkNotNull(webMonitor);
+		this.actorSystem = checkNotNull(actorSystem);
+		this.lookupTimeout = checkNotNull(lookupTimeout);
+		this.timeout = checkNotNull(timeout);
+	}
+
+	/**
+	 * Returns the leading job manager gateway and its web monitor port.
+	 */
+	public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
+		Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = leaderPromise;
+
+		if (!promise._1().isCompleted() || !promise._1().isCompleted()) {
+			return Option.empty();
+		}
+		else {
+			Promise<ActorGateway> leaderGatewayPromise = promise._1();
+			Promise<Integer> leaderWebPortPromise = promise._2();
+
+			ActorGateway leaderGateway = Await.result(leaderGatewayPromise.future(), timeout);
+			int leaderWebPort = Await.result(leaderWebPortPromise.future(), timeout);
+
+			return Option.apply(new Tuple2<>(leaderGateway, leaderWebPort));
+		}
+	}
+
+	/**
+	 * Awaits the leading job manager gateway and its web monitor port.
+	 */
+	public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
+		Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = leaderPromise;
+
+		Promise<ActorGateway> leaderGatewayPromise = promise._1();
+		Promise<Integer> leaderWebPortPromise = promise._2();
+
+		ActorGateway leaderGateway = Await.result(leaderGatewayPromise.future(), timeout);
+		int leaderWebPort = Await.result(leaderWebPortPromise.future(), timeout);
+
+		return new Tuple2<>(leaderGateway, leaderWebPort);
+	}
+
+	@Override
+	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+		if (leaderAddress != null && !leaderAddress.equals("")) {
+			try {
+				final Promise<ActorGateway> gatewayPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+				final Promise<Integer> webPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+				final Tuple2<Promise<ActorGateway>, Promise<Integer>> newPromise = new Tuple2<>(
+						gatewayPromise, webPortPromise);
+
+				LOG.info("Retrieved leader notification {}:{}.", leaderAddress, leaderSessionID);
+
+				AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
+						// Resolve the actor ref
+						.flatMap(new Mapper<ActorRef, Future<Object>>() {
+							@Override
+							public Future<Object> apply(ActorRef jobManagerRef) {
+								ActorGateway leaderGateway = new AkkaActorGateway(
+										jobManagerRef, leaderSessionID);
+
+								gatewayPromise.success(leaderGateway);
+
+								return leaderGateway.ask(JobManagerMessages
+										.getRequestWebMonitorPort(), timeout);
+							}
+						}, actorSystem.dispatcher())
+								// Request the web monitor port
+						.onComplete(new OnComplete<Object>() {
+							@Override
+							public void onComplete(Throwable failure, Object success) throws Throwable {
+								if (failure == null) {
+									int webMonitorPort = ((ResponseWebMonitorPort) success).port();
+									webPortPromise.success(webMonitorPort);
+
+									// Complete the promise
+									synchronized (lock) {
+										Tuple2<Promise<ActorGateway>, Promise<Integer>>
+												previousPromise = leaderPromise;
+
+										leaderPromise = newPromise;
+
+										if (!previousPromise._2().isCompleted()) {
+											previousPromise._1().completeWith(gatewayPromise.future());
+											previousPromise._2().completeWith(webPortPromise.future());
+										}
+									}
+								}
+								else {
+									LOG.warn("Failed to retrieve leader gateway and port.");
+								}
+							}
+						}, actorSystem.dispatcher());
+			}
+			catch (Exception e) {
+				handleError(e);
+			}
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Received error from LeaderRetrievalService.", exception);
+
+		try {
+			// stop associated webMonitor
+			webMonitor.stop();
+		}
+		catch (Exception e) {
+			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4574519..b9369ea 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,51 +20,90 @@ package org.apache.flink.runtime.webmonitor;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Promise;
 
 import java.nio.charset.Charset;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The Netty channel handler that processes all HTTP requests.
  * This handler takes the path parameters and delegates the work to a {@link RequestHandler}.
  * This handler also deals with setting correct response MIME types and returning
  * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
  */
-@ChannelHandler.Sharable
 public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
-	
+
 	private static final Charset ENCODING = Charset.forName("UTF-8");
-	
+
 	private final RequestHandler handler;
-	
+
+	private final JobManagerRetriever retriever;
+
+	private final Promise<String> localJobManagerAddressPromise;
+
 	private final String contentType;
-	
-	public RuntimeMonitorHandler(RequestHandler handler) {
-		if (handler == null) {
-			throw new NullPointerException();
-		}
-		this.handler = handler;
+
+	private String localJobManagerAddress;
+
+	public RuntimeMonitorHandler(
+			RequestHandler handler,
+			JobManagerRetriever retriever,
+			Promise<String> localJobManagerAddressPromise) {
+
+		this.handler = checkNotNull(handler);
+		this.retriever = checkNotNull(retriever);
+		this.localJobManagerAddressPromise = checkNotNull(localJobManagerAddressPromise);
 		this.contentType = (handler instanceof RequestHandler.JsonResponse) ? "application/json" : "text/plain";
 	}
-	
+
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		if (localJobManagerAddress == null) {
+			localJobManagerAddress = localJobManagerAddressPromise.future().value().get().get();
+		}
+
+		Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+
+		if (jobManager.isDefined()) {
+			String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
+					localJobManagerAddress, jobManager.get());
+
+			if (redirectAddress != null) {
+				HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
+				KeepAliveWrite.flush(ctx, routed.request(), redirect);
+			}
+			else {
+				respondAsLeader(ctx, routed, jobManager.get()._1());
+			}
+		}
+		else {
+			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
+		}
+	}
+
+	private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
 		DefaultFullHttpResponse response;
-		
+
 		try {
-			String result = handler.handleRequest(routed.pathParams());
+			String result = handler.handleRequest(routed.pathParams(), jobManager);
 			byte[] bytes = result.getBytes(ENCODING);
-			
+
 			response = new DefaultFullHttpResponse(
 					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 649cf75..40ab6c1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -61,6 +61,7 @@ import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -84,7 +85,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class WebRuntimeMonitor implements WebMonitor {
 
-	/** By default, all requests to the JobManager have a timeout of 10 seconds */ 
+	/** By default, all requests to the JobManager have a timeout of 10 seconds */
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
 
 	/** Logger for web frontend startup / shutdown messages */
@@ -104,13 +105,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 	private final LeaderRetrievalService leaderRetrievalService;
 
 	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
-	private final JobManagerArchiveRetriever retriever;
+	private final JobManagerRetriever retriever;
 
 	private final Router router;
 
 	private final int configuredPort;
 
-	private ServerBootstrap bootstrap;
+	private final ServerBootstrap bootstrap;
+
+	private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
 
 	private Channel serverChannel;
 
@@ -120,10 +123,9 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 
 	public WebRuntimeMonitor(
-				Configuration config,
-				LeaderRetrievalService leaderRetrievalService,
-				ActorSystem actorSystem) throws IOException
-	{
+			Configuration config,
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem) throws IOException, InterruptedException {
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
@@ -175,16 +177,16 @@ public class WebRuntimeMonitor implements WebMonitor {
 		FiniteDuration timeout = AkkaUtils.getTimeout(config);
 		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
 
-		retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
+		retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout);
 
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
 
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
 
 			// the overview - how many task managers, slots, free slots, ...
-			.GET("/overview", handler(new ClusterOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
 
 			// job manager configuration, log and stdout
 			.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
@@ -192,9 +194,9 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobmanager/stdout", new StaticFileServerHandler(outDir))
 
 			// overview over jobs
-			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, true)))
-			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, false)))
-			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, false, true)))
+			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))
+			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)))
+			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)))
 
 			.GET("/jobs", handler(new CurrentJobIdsHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
 
@@ -215,19 +217,13 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
 
-			.GET("/taskmanagers", handler(new TaskManagersHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
 
 			// this handler serves all the static contents
-			.GET("/:*", new StaticFileServerHandler(webRootDir));
-	}
+			.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise, webRootDir));
 
-	@Override
-	public void start() throws Exception {
 		synchronized (startupShutdownLock) {
-			if (this.bootstrap != null) {
-				throw new IllegalStateException("The server has already been started");
-			}
 
 			// add shutdown hook for deleting the directory
 			try {
@@ -246,16 +242,16 @@ public class WebRuntimeMonitor implements WebMonitor {
 			}
 
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
-	
+
 				@Override
 				protected void initChannel(SocketChannel ch) {
 					Handler handler = new Handler(router);
 
 					ch.pipeline()
-						.addLast(new HttpServerCodec())
-						.addLast(new HttpObjectAggregator(65536))
-						.addLast(new ChunkedWriteHandler())
-						.addLast(handler.name(), handler);
+							.addLast(new HttpServerCodec())
+							.addLast(new HttpObjectAggregator(65536))
+							.addLast(new ChunkedWriteHandler())
+							.addLast(handler.name(), handler);
 				}
 			};
 
@@ -276,7 +272,14 @@ public class WebRuntimeMonitor implements WebMonitor {
 			int port = bindAddress.getPort();
 
 			LOG.info("Web frontend listening at " + address + ':' + port);
+		}
+	}
 
+	@Override
+	public void start(String jobManagerAkkaUrl) throws Exception {
+		LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort());
+		synchronized (startupShutdownLock) {
+			jobManagerAddressPromise.success(jobManagerAkkaUrl);
 			leaderRetrievalService.start(retriever);
 		}
 	}
@@ -294,7 +297,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 				if (bootstrap.group() != null) {
 					bootstrap.group().shutdownGracefully();
 				}
-				this.bootstrap = null;
 			}
 
 			shutdown();
@@ -332,7 +334,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static RuntimeMonitorHandler handler(RequestHandler handler) {
-		return new RuntimeMonitorHandler(handler);
+	private RuntimeMonitorHandler handler(RequestHandler handler) {
+		return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 51e85b9..944407e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.files;
 /*****************************************************************************
  * This code is based on the "HttpStaticFileServerHandler" from the
  * Netty project's HTTP server example.
- * 
+ *
  * See http://netty.io and
  * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
@@ -41,12 +41,18 @@ import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.util.CharsetUtil;
-
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Promise;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -61,6 +67,7 @@ import java.util.GregorianCalendar;
 import java.util.Locale;
 import java.util.TimeZone;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -75,55 +82,72 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 /**
- * Simple file server handler that serves requests to web frontend's static files, such as 
+ * Simple file server handler that serves requests to web frontend's static files, such as
  * HTML, CSS, or JS files.
- * 
+ *
  * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
  * example.</p>
  */
 @ChannelHandler.Sharable
 public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
-	
+
 	/** Default logger, if none is specified */
 	private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(StaticFileServerHandler.class);
-	
+
 	/** Timezone in which this server answers its "if-modified" requests */
 	private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
 
 	/** Date format for HTTP */
 	private static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
-	
+
 	/** Be default, we allow files to be cached for 5 minutes */
 	private static final int HTTP_CACHE_SECONDS = 300;
 
 	// ------------------------------------------------------------------------
 
+	/** JobManager retriever */
+	private final JobManagerRetriever retriever;
+
+	private final Promise<String> localJobManagerAddressPromise;
+
 	/** The path in which the static documents are */
 	private final File rootPath;
 
 	/** The log for all error reporting */
 	private final Logger logger;
 
-	
-	public StaticFileServerHandler(File rootPath) {
-		this(rootPath, DEFAULT_LOGGER);
+	private String localJobManagerAddress;
+
+	public StaticFileServerHandler(
+			JobManagerRetriever retriever,
+			Promise<String> localJobManagerAddressPromise,
+			File rootPath) {
+
+		this(retriever, localJobManagerAddressPromise, rootPath, DEFAULT_LOGGER);
 	}
-	
-	public StaticFileServerHandler(File rootPath, Logger logger) {
-		if (rootPath == null || logger == null) {
-			throw new NullPointerException();
-		}
-		
-		this.rootPath = rootPath;
-		this.logger = logger;
+
+	public StaticFileServerHandler(
+			JobManagerRetriever retriever,
+			Promise<String> localJobManagerAddressPromise,
+			File rootPath,
+			Logger logger) {
+
+		this.retriever = checkNotNull(retriever);
+		this.localJobManagerAddressPromise = localJobManagerAddressPromise;
+		this.rootPath = checkNotNull(rootPath);
+		this.logger = checkNotNull(logger);
 	}
 
 	// ------------------------------------------------------------------------
 	//  Responses to requests
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+		if (localJobManagerAddress == null) {
+			localJobManagerAddress = localJobManagerAddressPromise.future().value().get().get();
+		}
+
 		final HttpRequest request = routed.request();
 		String requestPath = routed.path();
 
@@ -139,6 +163,32 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
 		}
 
+		Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+
+		if (jobManager.isDefined()) {
+			// Redirect to leader if necessary
+			String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
+					localJobManagerAddress, jobManager.get());
+
+			if (redirectAddress != null) {
+				HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
+				KeepAliveWrite.flush(ctx, routed.request(), redirect);
+			}
+			else {
+				respondAsLeader(ctx, request, requestPath);
+			}
+		}
+		else {
+			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
+		}
+	}
+
+	/**
+	 * Response when running with leading JobManager.
+	 */
+	private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
+			throws ParseException, IOException {
+
 		// convert to absolute path
 		final File file = new File(rootPath, requestPath);
 
@@ -157,7 +207,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				Files.copy(resourceStream, file.toPath());
 			}
 		}
-		
+
 		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
 			sendError(ctx, NOT_FOUND);
 			return;
@@ -177,7 +227,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				if (logger.isDebugEnabled()) {
 					logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\'');
 				}
-				
+
 				sendNotModified(ctx);
 				return;
 			}
@@ -229,15 +279,15 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 			sendError(ctx, INTERNAL_SERVER_ERROR);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities to encode headers and responses
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Writes a simple  error response message.
-	 * 
-	 * @param ctx The channel context to write the response to.
+	 *
+	 * @param ctx    The channel context to write the response to.
 	 * @param status The response status.
 	 */
 	private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
@@ -279,7 +329,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 	/**
 	 * Sets the "date" and "cache" headers for the HTTP Response.
 	 *
-	 * @param response The HTTP response object.
+	 * @param response    The HTTP response object.
 	 * @param fileToCache File to extract the modification timestamp from.
 	 */
 	private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
@@ -301,7 +351,7 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 	 * Sets the content type header for the HTTP Response.
 	 *
 	 * @param response HTTP response
-	 * @param file file to extract content type
+	 * @param file     file to extract content type
 	 */
 	private static void setContentTypeHeader(HttpResponse response, File file) {
 		String mimeType = MimeTypes.getMimeTypeForFileName(file.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index d9b4e59..4df387a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
 
@@ -33,14 +34,12 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 	
 	private final ExecutionGraphHolder executionGraphHolder;
 	
-	
 	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
 		this.executionGraphHolder = executionGraphHolder;
 	}
-	
-	
+
 	@Override
-	public final String handleRequest(Map<String, String> params) throws Exception {
+	public final String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
 		String jidString = params.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -54,7 +53,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan
 			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage()); 
 		}
 		
-		ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid);
+		ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager);
 		if (eg == null) {
 			throw new NotFoundException("Could not find job with id " + jid);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index dde368b..9fcf144 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -31,31 +29,25 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.StringWriter;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
-
-	private final JobManagerArchiveRetriever retriever;
+public class ClusterOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
 
 	private final FiniteDuration timeout;
 	
 
-	public ClusterOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
-		if (retriever == null || timeout == null) {
-			throw new NullPointerException();
-		}
-		this.retriever = retriever;
-		this.timeout = timeout;
+	public ClusterOverviewHandler(FiniteDuration timeout) {
+		this.timeout = checkNotNull(timeout);
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
 				StatusOverview overview = (StatusOverview) Await.result(future, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 049bd54..06fe34b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -39,12 +39,12 @@ import java.util.Map;
  */
 public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
 
-	private final JobManagerArchiveRetriever retriever;
+	private final JobManagerRetriever retriever;
 
 	private final FiniteDuration timeout;
 
 
-	public CurrentJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+	public CurrentJobIdsHandler(JobManagerRetriever retriever, FiniteDuration timeout) {
 		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
@@ -53,11 +53,9 @@ public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.Json
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
 			if (jobManager != null) {
 				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
 				JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index d9bfcb7..3ca0420 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
-
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -33,37 +31,33 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.StringWriter;
 import java.util.Map;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Request handler that returns a summary of the job status.
  */
 public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
 
-	private final JobManagerArchiveRetriever retriever;
-	
 	private final FiniteDuration timeout;
 	
 	private final boolean includeRunningJobs;
 	private final boolean includeFinishedJobs;
 
 	
-	public CurrentJobsOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout,
-										boolean includeRunningJobs, boolean includeFinishedJobs) {
-		if (retriever == null || timeout == null) {
-			throw new NullPointerException();
-		}
-		this.retriever = retriever;
-		this.timeout = timeout;
+	public CurrentJobsOverviewHandler(
+			FiniteDuration timeout,
+			boolean includeRunningJobs,
+			boolean includeFinishedJobs) {
+
+		this.timeout = checkNotNull(timeout);
 		this.includeRunningJobs = includeRunningJobs;
 		this.includeFinishedJobs = includeFinishedJobs;
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
 			if (jobManager != null) {
-				
 				Future<Object> future = jobManager.ask(
 						new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index ad72f0a..4027782 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.io.StringWriter;
@@ -66,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler, RequestHandler.Js
 	}
 	
 	@Override
-	public String handleRequest(Map<String, String> params) {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManagerGateway) {
 		return this.configString;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
new file mode 100644
index 0000000..887c46e
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Utilities to extract a redirect address.
+ *
+ * <p>This is necessary at the moment, because many execution graph structures are not serializable.
+ * The proper solution here is to have these serializable and transparently work with the leading
+ * job manager instead of redirecting.
+ */
+public class HandlerRedirectUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HandlerRedirectUtils.class);
+
+	/** Pattern to extract the host from an remote Akka URL */
+	private final static Pattern LeaderAddressHostPattern = Pattern.compile("^.+@(.+):([0-9]+)/user/.+$");
+
+	public static String getRedirectAddress(
+			String localJobManagerAddress,
+			Tuple2<ActorGateway, Integer> leader) throws Exception {
+
+		final String leaderAddress = leader._1().path();
+		final int webMonitorPort = leader._2();
+
+		if (!localJobManagerAddress.equals(leaderAddress)) {
+			// We are not the leader and need to redirect
+			Matcher matcher = LeaderAddressHostPattern.matcher(leaderAddress);
+
+			if (matcher.matches()) {
+				String redirectAddress = String.format("%s:%d", matcher.group(1), webMonitorPort);
+				return redirectAddress;
+			}
+			else {
+				LOG.warn("Unexpected leader address pattern. Cannot extract host.");
+			}
+		}
+
+		return null;
+	}
+
+	public static HttpResponse getRedirectResponse(String redirectAddress, String path) throws Exception {
+		checkNotNull(redirectAddress, "Redirect address");
+		checkNotNull(path, "Path");
+
+		String newLocation = String.format("http://%s%s", redirectAddress, path);
+
+		HttpResponse redirectResponse = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT);
+		redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation);
+		redirectResponse.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
+
+		return redirectResponse;
+	}
+
+	public static HttpResponse getUnavailableResponse() throws UnsupportedEncodingException {
+		String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
+		byte[] bytes = result.getBytes(Charset.forName("UTF-8"));
+
+		HttpResponse unavailableResponse = new DefaultFullHttpResponse(
+				HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
+
+		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length);
+		unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt"));
+
+		return unavailableResponse;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 77314ec..3a0c774 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -36,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler, RequestHandler.J
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManagerGateway) throws Exception {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 4ffb9d9..53d1179 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.instance.ActorGateway;
+
 import java.util.Map;
 
 public interface RequestHandler {
@@ -25,15 +27,15 @@ public interface RequestHandler {
 	/**
 	 * This interface marks handlers that return JSON data.
 	 */
-	public static interface JsonResponse {}
+	interface JsonResponse {}
 
 	/**
 	 * This interface marks handlers that return plain text data.
 	 */
-	public static interface TextResponse {}
+	interface TextResponse {}
 	
 	
 	// --------------------------------------------------------------------------------------------
 
-	String handleRequest(Map<String, String> params) throws Exception;
+	String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index dbdb439..0becb6a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.util.StringUtils;
@@ -36,27 +35,21 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class TaskManagersHandler implements  RequestHandler, RequestHandler.JsonResponse {
+import static com.google.common.base.Preconditions.checkNotNull;
 
-	private final JobManagerArchiveRetriever retriever;
+public class TaskManagersHandler implements RequestHandler, RequestHandler.JsonResponse {
 
 	private final FiniteDuration timeout;
 
 	public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
-
-	public TaskManagersHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
-		if (retriever == null || timeout == null) {
-			throw new NullPointerException();
-		}
-		this.retriever = retriever;
-		this.timeout = timeout;
+	
+	public TaskManagersHandler(FiniteDuration timeout) {
+		this.timeout = checkNotNull(timeout);
 	}
 
 	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
+	public String handleRequest(Map<String, String> params, ActorGateway jobManager) throws Exception {
 		try {
-			ActorGateway jobManager = retriever.getJobManagerGateway();
-
 			if (jobManager != null) {
 				// whether one task manager's metrics are requested, or all task manager, we
 				// return them in an array. This avoids unnecessary code complexity.
@@ -117,7 +110,8 @@ public class TaskManagersHandler implements  RequestHandler, RequestHandler.Json
 
 				gen.close();
 				return writer.toString();
-			} else {
+			}
+			else {
 				throw new Exception("No connection to the leading JobManager.");
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77fc0cc4/flink-runtime-web/src/main/resources/web/css/index.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/resources/web/css/index.css b/flink-runtime-web/src/main/resources/web/css/index.css
new file mode 100644
index 0000000..43f7d98
--- /dev/null
+++ b/flink-runtime-web/src/main/resources/web/css/index.css
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#sidebar {
+  overflow: hidden;
+  position: fixed;
+  left: -250px;
+  top: 0;
+  bottom: 0;
+  height: 100%;
+  width: 250px;
+  background: #151515;
+  -webkit-transition: 400ms;
+  -moz-transition: 400ms;
+  -o-transition: 400ms;
+  -ms-transition: 400ms;
+  transition: 400ms;
+  -webkit-box-shadow: inset -10px 0px 10px rgba(0,0,0,0.2);
+  box-shadow: inset -10px 0px 10px rgba(0,0,0,0.2);
+}
+#sidebar.sidebar-visible {
+  left: 0;
+}
+#sidebar .logo {
+  width: auto;
+  height: 22px;
+}
+#sidebar .logo img {
+  display: inline-block;
+}
+#sidebar .navbar-static-top {
+  overflow: hidden;
+  height: 51px;
+}
+#sidebar .navbar-static-top .navbar-header {
+  width: 100%;
+}
+#sidebar .navbar-brand.navbar-brand-text {
+  font-size: 14px;
+  font-weight: bold;
+  color: #fff;
+  padding-left: 0;
+}
+#sidebar .nav > li > a {
+  color: #aaa;
+  margin-bottom: 1px;
+}
+#sidebar .nav > li > a:hover,
+#sidebar .nav > li > a:focus {
+  background-color: rgba(40,40,40,0.5);
+}
+#sidebar .nav > li > a.active {
+  background-color: rgba(100,100,100,0.5);
+}
+#content {
+  background-color: #fff;
+  overflow: hidden;
+  margin-left: 0;
+  padding-top: 70px;
+  -webkit-transition: 400ms;
+  -moz-transition: 400ms;
+  -o-transition: 400ms;
+  -ms-transition: 400ms;
+  transition: 400ms;
+}
+#content .navbar-main,
+#content .navbar-main-additional {
+  -webkit-transition: 400ms;
+  -moz-transition: 400ms;
+  -o-transition: 400ms;
+  -ms-transition: 400ms;
+  transition: 400ms;
+}
+#content .navbar-main-additional {
+  margin-top: 51px;
+  border-bottom: none;
+  padding: 0 20px;
+}
+#content .navbar-main-additional .nav-tabs {
+  margin: 0 -20px;
+  padding: 0 20px;
+}
+#content .navbar-secondary-additional {
+  border: none;
+  padding: 0 20px;
+  margin-bottom: 0;
+}
+#content .navbar-secondary-additional .nav-tabs {
+  margin: 0 -20px;
+}
+#content.sidebar-visible {
+  margin-left: 250px;
+}
+#content.sidebar-visible .navbar-main,
+#content.sidebar-visible .navbar-main-additional {
+  left: 250px;
+}
+#content #fold-button {
+  display: inline-block;
+  margin-left: 20px;
+}
+#content #content-inner {
+  padding: 0px 20px 20px 20px;
+}
+#content #content-inner.has-navbar-main-additional {
+  padding-top: 42px;
+}
+.page-header {
+  margin: 0 0 20px 0;
+}
+.nav > li > a,
+.nav > li > a:hover,
+.nav > li > a:focus {
+  color: #aaa;
+  background-color: transparent;
+  border-bottom: 2px solid transparent;
+}
+.nav > li.active > a,
+.nav > li.active > a:hover,
+.nav > li.active > a:focus {
+  color: #000;
+  border-bottom: 2px solid #000;
+}
+.nav.nav-tabs {
+  margin-bottom: 20px;
+}
+.table .table {
+  background-color: transparent;
+}
+.table th {
+  font-weight: normal;
+  color: #999;
+}
+.table td.td-long {
+  width: 20%;
+  white-space: pre-wrap;
+  white-space: -moz-pre-wrap;
+  white-space: -pre-wrap;
+  white-space: -o-pre-wrap;
+  word-wrap: break-word;
+}
+.table.table-clickable tr {
+  cursor: pointer;
+}
+.table.table-inner {
+  background-color: transparent;
+}
+.table.table-properties {
+  table-layout: fixed;
+  white-space: nowrap;
+}
+.table.table-properties td {
+  width: 50%;
+  white-space: nowrap;
+  overflow: hidden;
+  -o-text-overflow: ellipsis;
+  text-overflow: ellipsis;
+}
+.table.table-body-hover > tbody {
+  border-top: none;
+  border-left: 2px solid transparent;
+}
+.table.table-body-hover > tbody.active {
+  border-left: 2px solid #000;
+}
+.table.table-body-hover > tbody:hover td:not(.tab-column),
+.table.table-body-hover > tbody.active td:not(.tab-column) {
+  background-color: #f0f0f0;
+}
+.table.table-body-hover > tbody:hover td.tab-column li.active,
+.table.table-body-hover > tbody.active td.tab-column li.active {
+  background-color: #f0f0f0;
+}
+.table.table-activable th.tab-column,
+.table.table-activable td.tab-column {
+  border-top: none;
+  width: 47px;
+}
+.table.table-activable td.tab-column {
+  border-right: 1px solid #ddd;
+}
+.table.table-activable td {
+  position: relative;
+}
+.table .small-label {
+  text-transform: uppercase;
+  font-size: 13px;
+  color: #999;
+}
+.panel.panel-dashboard .huge {
+  font-size: 28px;
+}
+.panel.panel-lg {
+  font-size: 16px;
+}
+.panel.panel-lg .badge {
+  font-size: 14px;
+}
+.navbar-secondary {
+  overflow: auto;
+}
+.navbar-main .navbar-title,
+.navbar-secondary .navbar-title,
+.navbar-main-additional .navbar-title,
+.panel.panel-multi .navbar-title,
+.navbar-secondary-additional .navbar-title,
+.navbar-main .panel-title,
+.navbar-secondary .panel-title,
+.navbar-main-additional .panel-title,
+.panel.panel-multi .panel-title,
+.navbar-secondary-additional .panel-title {
+  float: left;
+  font-size: 18px;
+  padding: 12px 20px 13px 10px;
+  color: #333;
+  display: inline-block;
+}
+.navbar-main .navbar-info,
+.navbar-secondary .navbar-info,
+.navbar-main-additional .navbar-info,
+.panel.panel-multi .navbar-info,
+.navbar-secondary-additional .navbar-info,
+.navbar-main .panel-info,
+.navbar-secondary .panel-info,
+.navbar-main-additional .panel-info,
+.panel.panel-multi .panel-info,
+.navbar-secondary-additional .panel-info {
+  float: left;
+  font-size: 14px;
+  padding: 15px 15px 15px 15px;
+  color: #999;
+  display: inline-block;
+  border-right: 1px solid #e7e7e7;
+  overflow: hidden;
+}
+.navbar-main .navbar-info .overflow,
+.navbar-secondary .navbar-info .overflow,
+.navbar-main-additional .navbar-info .overflow,
+.panel.panel-multi .navbar-info .overflow,
+.navbar-secondary-additional .navbar-info .overflow,
+.navbar-main .panel-info .overflow,
+.navbar-secondary .panel-info .overflow,
+.navbar-main-additional .panel-info .overflow,
+.panel.panel-multi .panel-info .overflow,
+.navbar-secondary-additional .panel-info .overflow {
+  position: absolute;
+  display: block;
+  -o-text-overflow: ellipsis;
+  text-overflow: ellipsis;
+  overflow: hidden;
+  height: 22px;
+  line-height: 22px;
+  vertical-align: middle;
+}
+.navbar-main .navbar-info.first,
+.navbar-secondary .navbar-info.first,
+.navbar-main-additional .navbar-info.first,
+.panel.panel-multi .navbar-info.first,
+.navbar-secondary-additional .navbar-info.first,
+.navbar-main .panel-info.first,
+.navbar-secondary .panel-info.first,
+.navbar-main-additional .panel-info.first,
+.panel.panel-multi .panel-info.first,
+.navbar-secondary-additional .panel-info.first {
+  border-left: 1px solid #e7e7e7;
+}
+.navbar-main .navbar-info.last,
+.navbar-secondary .navbar-info.last,
+.navbar-main-additional .navbar-info.last,
+.panel.panel-multi .navbar-info.last,
+.navbar-secondary-additional .navbar-info.last,
+.navbar-main .panel-info.last,
+.navbar-secondary .panel-info.last,
+.navbar-main-additional .panel-info.last,
+.panel.panel-multi .panel-info.last,
+.navbar-secondary-additional .panel-info.last {
+  border-right: none;
+}
+.panel.panel-multi .panel-heading {
+  padding: 0;
+}
+.panel.panel-multi .panel-heading .panel-info.thin {
+  padding: 8px 10px;
+}
+.panel.panel-multi .panel-body {
+  padding: 10px;
+  background-color: #fdfdfd;
+  color: #999;
+  font-size: 13px;
+}
+.panel.panel-multi .panel-body.clean {
+  color: inherit;
+  font-size: inherit;
+}
+.navbar-main-additional,
+.navbar-secondary-additional {
+  min-height: 40px;
+  background-color: #fdfdfd;
+}
+.navbar-main-additional .navbar-info,
+.navbar-secondary-additional .navbar-info {
+  font-size: 13px;
+  padding: 10px 15px 10px 15px;
+}
+.nav-top-affix.affix {
+  width: 100%;
+  top: 50px;
+  margin-left: -20px;
+  padding-left: 20px;
+  margin-right: -20px;
+  padding-right: 20px;
+  background-color: #fff;
+  z-index: 1;
+}
+.badge-default[href]:hover,
+.badge-default[href]:focus {
+  background-color: #808080;
+}
+.badge-primary {
+  background-color: #428bca;
+}
+.badge-primary[href]:hover,
+.badge-primary[href]:focus {
+  background-color: #3071a9;
+}
+.badge-success {
+  background-color: #5cb85c;
+}
+.badge-success[href]:hover,
+.badge-success[href]:focus {
+  background-color: #449d44;
+}
+.badge-info {
+  background-color: #5bc0de;
+}
+.badge-info[href]:hover,
+.badge-info[href]:focus {
+  background-color: #31b0d5;
+}
+.badge-warning {
+  background-color: #f0ad4e;
+}
+.badge-warning[href]:hover,
+.badge-warning[href]:focus {
+  background-color: #ec971f;
+}
+.badge-danger {
+  background-color: #d9534f;
+}
+.badge-danger[href]:hover,
+.badge-danger[href]:focus {
+  background-color: #c9302c;
+}
+.indicator {
+  display: inline-block;
+  margin-right: 15px;
+}
+.indicator.indicator-primary {
+  color: #428bca;
+}
+.indicator.indicator-success {
+  color: #5cb85c;
+}
+.indicator.indicator-info {
+  color: #5bc0de;
+}
+.indicator.indicator-warning {
+  color: #f0ad4e;
+}
+.indicator.indicator-danger {
+  color: #d9534f;
+}
+pre.exception {
+  border: none;
+  background-color: transparent;
+  padding: 0;
+  margin: 0;
+}
+.nav-tabs.tabs-vertical {
+  position: absolute;
+  left: 0;
+  top: 0;
+  border-bottom: none;
+  z-index: 100;
+}
+.nav-tabs.tabs-vertical li {
+  float: none;
+  margin-bottom: 0;
+  margin-right: -1px;
+}
+.nav-tabs.tabs-vertical li > a {
+  margin-right: 0;
+  -webkit-border-radius: 0;
+  border-radius: 0;
+  border-bottom: none;
+  border-left: 2px solid transparent;
+}
+.nav-tabs.tabs-vertical li > a:hover,
+.nav-tabs.tabs-vertical li > a:focus {
+  border-bottom: none;
+  border-left: 2px solid #000;
+}
+.nav-tabs.tabs-vertical li.active > a {
+  border-bottom: none;
+  border-left: 2px solid #000;
+}
+.navbar-main .navbar-title,
+.navbar-secondary .navbar-title,
+.navbar-main-additional .navbar-title,
+.navbar-secondary-additional .navbar-title {
+  padding: 12px 20px 13px 20px;
+}
+livechart {
+  width: 30%;
+  height: 30%;
+  text-align: center;
+}
+.canvas-wrapper {
+  border: 1px solid #ddd;
+  position: relative;
+  margin-bottom: 20px;
+}
+.canvas-wrapper .main-canvas {
+  height: 400px;
+  overflow: hidden;
+}
+.canvas-wrapper .main-canvas .zoom-buttons {
+  position: absolute;
+  top: 10px;
+  right: 10px;
+}
+.label-group .label {
+  display: inline-block;
+  width: 2em;
+  padding-left: 0.1em;
+  padding-right: 0.1em;
+  margin: 0;
+  border-right: 1px solid #fff;
+  -webkit-border-radius: 0;
+  border-radius: 0;
+}
+.label-group .label.label-black {
+  background-color: #000;
+}
+svg.graph {
+  overflow: hidden;
+}
+svg.graph g.type-TK > rect {
+  fill: #00ffd0;
+}
+svg.graph text {
+  font-weight: 300;
+  font-size: 14px;
+}
+svg.graph .node {
+  cursor: pointer;
+}
+svg.graph .node > rect {
+  stroke: #999;
+  stroke-width: 5px;
+  fill: #fff;
+  margin: 0;
+  padding: 0;
+}
+svg.graph .node[active] > rect {
+  fill: #eee;
+}
+svg.graph .node.node-mirror > rect {
+  stroke: #a8a8a8;
+}
+svg.graph .node.node-iteration > rect {
+  stroke: #cd3333;
+}
+svg.graph .node.node-source > rect {
+  stroke: #4ce199;
+}
+svg.graph .node.node-sink > rect {
+  stroke: #e6ec8b;
+}
+svg.graph .node.node-normal > rect {
+  stroke: #3fb6d8;
+}
+svg.graph .node h4 {
+  color: #000;
+}
+svg.graph .node h5 {
+  color: #999;
+}
+svg.graph .edgeLabel rect {
+  fill: #fff;
+}
+svg.graph .edgePath path {
+  stroke: #333;
+  stroke-width: 2px;
+  fill: #333;
+}
+svg.graph .label {
+  color: #777;
+  margin: 0;
+}
+svg.graph .edge-label {
+  font-size: 14px;
+}
+svg.graph .node-label {
+  display: block;
+  margin: 0;
+  text-decoration: none;
+}
+.timeline {
+  overflow: hidden;
+}
+.timeline-canvas {
+  overflow: hidden;
+  padding: 10px;
+}
+.timeline-canvas .bar-container {
+  overflow: hidden;
+}
+.timeline-canvas .timeline-insidelabel,
+.timeline-canvas .timeline-series {
+  cursor: pointer;
+}
+.timeline-canvas.secondary .timeline-insidelabel,
+.timeline-canvas.secondary .timeline-series {
+  cursor: auto;
+}
+.qtip-timeline-bar {
+  font-size: 14px;
+  line-height: 1.4;
+}
+@media (min-width: 1024px) and (max-width: 1279px) {
+  #sidebar {
+    left: 0;
+    width: 160px;
+  }
+  #sidebar .navbar-static-top .navbar-brand-text {
+    display: none;
+  }
+  #content {
+    margin-left: 160px;
+  }
+  #content #fold-button {
+    display: none;
+  }
+  #content .navbar-main,
+  #content .navbar-main-additional {
+    left: 160px;
+  }
+  .table td.td-long {
+    width: 20%;
+  }
+}
+@media (min-width: 1280px) {
+  #sidebar {
+    left: 0;
+  }
+  #content {
+    margin-left: 250px;
+  }
+  #content #fold-button {
+    display: none;
+  }
+  #content .navbar-main,
+  #content .navbar-main-additional {
+    left: 250px;
+  }
+  .table td.td-long {
+    width: 30%;
+  }
+}
+#total-mem {
+  background-color: #7cb5ec;
+}
+#heap-mem {
+  background-color: #434348;
+}
+#non-heap-mem {
+  background-color: #90ed7d;
+}
+a.show-pointer {
+  cursor: pointer;
+}