You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:20:23 UTC

[51/51] [abbrv] flink git commit: [FLINK-2688] [monitoring api] Integrate monitoring request handler with HA leader handling

[FLINK-2688] [monitoring api] Integrate monitoring request handler with HA leader handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/506ce61f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/506ce61f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/506ce61f

Branch: refs/heads/master
Commit: 506ce61f66b862037bc57c7c25ae838bd762f2ac
Parents: 8db523c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 16 22:04:19 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:53 2015 +0200

----------------------------------------------------------------------
 .../optimizer/plantranslate/JsonMapper.java     |  8 +-
 .../webmonitor/ExecutionGraphHolder.java        | 56 +++++++-----
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 43 ++++++++--
 .../handlers/ClusterOverviewHandler.java        | 56 ++++++------
 .../handlers/CurrentJobIdsHandler.java          | 90 +++++++++++---------
 .../handlers/CurrentJobsOverviewHandler.java    | 82 ++++++++++--------
 .../runtime/messages/GenericMessageTester.java  |  2 +-
 7 files changed, 207 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
index 4d81058..d5ddf4d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -107,8 +107,14 @@ public class JsonMapper {
 			case NESTEDLOOP_STREAMED_OUTER_SECOND:
 				return "Nested Loops (Streamed Outer: " + secondInputName + ")";
 
-			case MERGE:
+			case INNER_MERGE:
 				return "Merge";
+			case FULL_OUTER_MERGE:
+				return "Full Outer Merge";
+			case LEFT_OUTER_MERGE:
+				return "Left Outer Merge";
+			case RIGHT_OUTER_MERGE:
+				return "Right Outer Merge";
 
 			case CO_GROUP:
 				return "Co-Group";

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 74278a1..09ede4c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -30,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.WeakHashMap;
 
 /**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
  * <p>
  * The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
  * at some point once no one else is pointing to the ExecutionGraph.
@@ -39,26 +41,33 @@ import java.util.WeakHashMap;
  */
 public class ExecutionGraphHolder {
 
-	private final ActorGateway source;
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+	/** Retrieves the current leading JobManager and its corresponding archive */
+	private final JobManagerArchiveRetriever retriever;
 
 	private final FiniteDuration timeout;
 
 	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
 
-
-	public ExecutionGraphHolder(ActorGateway source) {
-		this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
+		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
-		if (source == null || timeout == null) {
+	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.source = source;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
 
-
+	/**
+	 * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+	 *
+	 * @param jid jobID of the execution graph to be retrieved
+	 * @return the retrieved execution graph or null if it is not retrievable
+	 */
 	public ExecutionGraph getExecutionGraph(JobID jid) {
 		ExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
@@ -66,18 +75,27 @@ public class ExecutionGraphHolder {
 		}
 
 		try {
-			Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
-			Object result = Await.result(future, timeout);
-			if (result instanceof JobManagerMessages.JobNotFound) {
-				return null;
-			}
-			else if (result instanceof JobManagerMessages.JobFound) {
-				ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
-				cache.put(jid, eg);
-				return eg;
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
+				Object result = Await.result(future, timeout);
+				
+				if (result instanceof JobManagerMessages.JobNotFound) {
+					return null;
+				}
+				else if (result instanceof JobManagerMessages.JobFound) {
+					ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+					cache.put(jid, eg);
+					return eg;
+				}
+				else {
+					throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+				}
 			}
 			else {
-				throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+				LOG.warn("No connection to the leading JobManager.");
+				return null;
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 4633dcf..a34823e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import akka.actor.ActorSystem;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -33,7 +35,8 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
@@ -63,6 +66,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The root component of the web runtime monitor. This class starts the web server and creates
  * all request handlers for the REST API.
@@ -84,7 +89,13 @@ public class WebRuntimeMonitor implements WebMonitor {
 	
 	// ------------------------------------------------------------------------
 	
+	/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
 	private final Object startupShutdownLock = new Object();
+
+	private final LeaderRetrievalService leaderRetrievalService;
+
+	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
+	private final JobManagerArchiveRetriever retriever;
 	
 	private final Router router;
 
@@ -95,7 +106,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 	private Channel serverChannel;
 
 	
-	public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
+	public WebRuntimeMonitor(
+				Configuration config,
+				LeaderRetrievalService leaderRetrievalService,
+				ActorSystem actorSystem) throws IOException
+	{
+		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 		
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
 		
@@ -127,22 +143,27 @@ public class WebRuntimeMonitor implements WebMonitor {
 		if (this.configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
+
+		FiniteDuration timeout = AkkaUtils.getTimeout(config);
+		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
+
+		retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
 		
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
-		
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
 
 			// the overview - how many task managers, slots, free slots, ...
-			.GET("/overview", handler(new ClusterOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/overview", handler(new ClusterOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
 
 			// overview over jobs
-			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, true)))
-			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, false)))
-			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, false, true)))
+			.GET("/joboverview", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, true)))
+			.GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, false)))
+			.GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, false, true)))
 
-			.GET("/jobs", handler(new CurrentJobIdsHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/jobs", handler(new CurrentJobIdsHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
 
 			.GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs)))
@@ -203,12 +224,16 @@ public class WebRuntimeMonitor implements WebMonitor {
 			int port = bindAddress.getPort();
 			
 			LOG.info("Web frontend listening at " + address + ':' + port);
+
+			leaderRetrievalService.start(retriever);
 		}
 	}
 	
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
+			leaderRetrievalService.stop();
+			
 			if (this.serverChannel != null) {
 				this.serverChannel.close().awaitUninterruptibly();
 				this.serverChannel = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 2d8b217..dde368b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -35,44 +36,51 @@ import java.util.Map;
  * TaskManagers are currently connected, and how many jobs are running.
  */
 public class ClusterOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
-	
-	private final ActorGateway jobManager;
-	
+
+	private final JobManagerArchiveRetriever retriever;
+
 	private final FiniteDuration timeout;
 	
-	
-	public ClusterOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
-		if (jobManager == null || timeout == null) {
+
+	public ClusterOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.jobManager = jobManager;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
-	
+
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
+		// we need no parameters, get all requests
 		try {
-			Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
-			StatusOverview overview = (StatusOverview) Await.result(future, timeout);
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+				Future<Object> future = jobManager.ask(RequestStatusOverview.getInstance(), timeout);
+				StatusOverview overview = (StatusOverview) Await.result(future, timeout);
 
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+				StringWriter writer = new StringWriter();
+				JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
 
-			gen.writeStartObject();
-			gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-			gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-			gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-			gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-			gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-			gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-			gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-			gen.writeEndObject();
+				gen.writeStartObject();
+				gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+				gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+				gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+				gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+				gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+				gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+				gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+				gen.writeEndObject();
 
-			gen.close();
-			return writer.toString();
+				gen.close();
+				return writer.toString();
+			} else {
+				throw new Exception("No connection to the leading JobManager.");
+			}
 		}
 		catch (Exception e) {
-			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index e7e96e2..049bd54 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -37,17 +38,17 @@ import java.util.Map;
  * given the JobManager or Archive Actor Reference.
  */
 public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
-	
-	private final ActorGateway target;
-	
+
+	private final JobManagerArchiveRetriever retriever;
+
 	private final FiniteDuration timeout;
-	
-	
-	public CurrentJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
-		if (target == null || timeout == null) {
+
+
+	public CurrentJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.target = target;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
 	
@@ -55,42 +56,49 @@ public class CurrentJobIdsHandler implements RequestHandler, RequestHandler.Json
 	public String handleRequest(Map<String, String> params) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
-			JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
-
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
-
-			gen.writeStartObject();
+			ActorGateway jobManager = retriever.getJobManagerGateway();
 
-			gen.writeArrayFieldStart("jobs-running");
-			for (JobID jid : overview.getJobsRunningOrPending()) {
-				gen.writeString(jid.toString());
-			}
-			gen.writeEndArray();
-
-			gen.writeArrayFieldStart("jobs-finished");
-			for (JobID jid : overview.getJobsFinished()) {
-				gen.writeString(jid.toString());
-			}
-			gen.writeEndArray();
-
-			gen.writeArrayFieldStart("jobs-cancelled");
-			for (JobID jid : overview.getJobsCancelled()) {
-				gen.writeString(jid.toString());
+			if (jobManager != null) {
+				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
+				JobsWithIDsOverview overview = (JobsWithIDsOverview) Await.result(future, timeout);
+	
+				StringWriter writer = new StringWriter();
+				JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+	
+				gen.writeStartObject();
+	
+				gen.writeArrayFieldStart("jobs-running");
+				for (JobID jid : overview.getJobsRunningOrPending()) {
+					gen.writeString(jid.toString());
+				}
+				gen.writeEndArray();
+	
+				gen.writeArrayFieldStart("jobs-finished");
+				for (JobID jid : overview.getJobsFinished()) {
+					gen.writeString(jid.toString());
+				}
+				gen.writeEndArray();
+	
+				gen.writeArrayFieldStart("jobs-cancelled");
+				for (JobID jid : overview.getJobsCancelled()) {
+					gen.writeString(jid.toString());
+				}
+				gen.writeEndArray();
+	
+				gen.writeArrayFieldStart("jobs-failed");
+				for (JobID jid : overview.getJobsFailed()) {
+					gen.writeString(jid.toString());
+				}
+				gen.writeEndArray();
+	
+				gen.writeEndObject();
+	
+				gen.close();
+				return writer.toString();
 			}
-			gen.writeEndArray();
-
-			gen.writeArrayFieldStart("jobs-failed");
-			for (JobID jid : overview.getJobsFailed()) {
-				gen.writeString(jid.toString());
+			else {
+				throw new Exception("No connection to the leading JobManager.");
 			}
-			gen.writeEndArray();
-
-			gen.writeEndObject();
-
-			gen.close();
-			return writer.toString();
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 6444e4b..d9bfcb7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -36,8 +37,8 @@ import java.util.Map;
  * Request handler that returns a summary of the job status.
  */
 public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
-	
-	private final ActorGateway jobManager;
+
+	private final JobManagerArchiveRetriever retriever;
 	
 	private final FiniteDuration timeout;
 	
@@ -45,9 +46,12 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 	private final boolean includeFinishedJobs;
 
 	
-	public CurrentJobsOverviewHandler(ActorGateway jobManager, FiniteDuration timeout,
+	public CurrentJobsOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout,
 										boolean includeRunningJobs, boolean includeFinishedJobs) {
-		this.jobManager = jobManager;
+		if (retriever == null || timeout == null) {
+			throw new NullPointerException();
+		}
+		this.retriever = retriever;
 		this.timeout = timeout;
 		this.includeRunningJobs = includeRunningJobs;
 		this.includeFinishedJobs = includeFinishedJobs;
@@ -56,42 +60,50 @@ public class CurrentJobsOverviewHandler implements RequestHandler, RequestHandle
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		try {
-			Future<Object> future = jobManager.ask(
-					new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
-			
-			MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
-
-			final long now = System.currentTimeMillis();
-
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
-			gen.writeStartObject();
-			
-			
-			if (includeRunningJobs && includeFinishedJobs) {
-				gen.writeArrayFieldStart("running");
-				for (JobDetails detail : result.getRunningJobs()) {
-					generateSingleJobDetails(detail, gen, now);
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+				
+				Future<Object> future = jobManager.ask(
+						new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
+				
+				MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+	
+				final long now = System.currentTimeMillis();
+	
+				StringWriter writer = new StringWriter();
+				JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
+				gen.writeStartObject();
+				
+				
+				if (includeRunningJobs && includeFinishedJobs) {
+					gen.writeArrayFieldStart("running");
+					for (JobDetails detail : result.getRunningJobs()) {
+						generateSingleJobDetails(detail, gen, now);
+					}
+					gen.writeEndArray();
+	
+					gen.writeArrayFieldStart("finished");
+					for (JobDetails detail : result.getFinishedJobs()) {
+						generateSingleJobDetails(detail, gen, now);
+					}
+					gen.writeEndArray();
 				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("finished");
-				for (JobDetails detail : result.getFinishedJobs()) {
-					generateSingleJobDetails(detail, gen, now);
+				else {
+					gen.writeArrayFieldStart("jobs");
+					for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+						generateSingleJobDetails(detail, gen, now);
+					}
+					gen.writeEndArray();
 				}
-				gen.writeEndArray();
+	
+				gen.writeEndObject();
+				gen.close();
+				return writer.toString();
 			}
 			else {
-				gen.writeArrayFieldStart("jobs");
-				for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-					generateSingleJobDetails(detail, gen, now);
-				}
-				gen.writeEndArray();
+				throw new Exception("No connection to the leading JobManager.");
 			}
-
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
 		}
 		catch (Exception e) {
 			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/506ce61f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
index dcf4839..76c1bd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.messages;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 
 import java.io.Serializable;
 import java.lang.reflect.Constructor;