You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:45 UTC

[13/51] [abbrv] flink git commit: [FLINK-2415] [web dashboard] Provide more data in the job overview responses

[FLINK-2415] [web dashboard] Provide more data in the job overview responses

Rather than only listing job ids, this now includes all information necessary to describe the job.
This reduces the number of requests for the job overview page to a single request.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/628e5f71
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/628e5f71
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/628e5f71

Branch: refs/heads/master
Commit: 628e5f716d0050674da5251f254b21b6012a1519
Parents: 4825c8c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 20 14:48:13 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:50 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/JsonFactory.java   | 136 +++++++---
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 111 ++++----
 .../runtime/webmonitor/files/MimeTypes.java     |   6 +-
 .../webmonitor/handlers/JobSummaryHandler.java  |  68 +----
 .../handlers/JobsOverviewHandler.java           |  74 ++++++
 .../handlers/RequestOverviewHandler.java        |  17 +-
 .../webmonitor/handlers/TextResponder.java      |  38 ---
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +
 .../runtime/messages/webmonitor/JobDetails.java | 152 +++++++++++
 .../webmonitor/JobsWithIDsOverview.java         |  38 ++-
 .../webmonitor/MultipleJobsDetails.java         |  81 ++++++
 .../messages/webmonitor/RequestJobDetails.java  |  80 ++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |  60 +++++
 .../flink/runtime/jobmanager/JobManager.scala   |  22 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |  56 ++--
 .../runtime/messages/GenericMessageTester.java  | 264 +++++++++++++++++++
 .../messages/WebMonitorMessagesTest.java        | 146 ++++++++++
 .../testingUtils/TestingMemoryArchivist.scala   |   2 +-
 18 files changed, 1127 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
index 66449e0..40d095f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JsonFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 
@@ -37,7 +39,8 @@ public class JsonFactory {
 
 	private static final com.fasterxml.jackson.core.JsonFactory jacksonFactory =
 			new com.fasterxml.jackson.core.JsonFactory();
-
+	
+	
 	public static String generateConfigJSON(long refreshInterval, long timeZoneOffset, String timeZoneName) {
 		try {
 			StringWriter writer = new StringWriter();
@@ -82,6 +85,103 @@ public class JsonFactory {
 		}
 	}
 
+	public static String generateJobDetailsJSON(JobDetails job) {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+			
+			generateSingleJobDetails(job, gen);
+			
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			// this should not happen
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+	
+	public static String generateMultipleJobsDetailsJSON(JobDetails[] jobs) {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+
+			gen.writeStartObject();
+			gen.writeArrayFieldStart("jobs");
+			for (JobDetails detail : jobs) {
+				generateSingleJobDetails(detail, gen);
+			}
+			gen.writeEndArray();
+			gen.writeEndObject();
+
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			// this should not happen
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+	
+	public static String generateRunningAndFinishedJobDetailsJSON(JobDetails[] runningJobs, JobDetails[] finishedJobs) {
+		try {
+			StringWriter writer = new StringWriter();
+			JsonGenerator gen = jacksonFactory.createJsonGenerator(writer);
+
+			gen.writeStartObject();
+
+			gen.writeArrayFieldStart("running");
+			for (JobDetails detail : runningJobs) {
+				generateSingleJobDetails(detail, gen);
+			}
+			gen.writeEndArray();
+
+			gen.writeArrayFieldStart("finished");
+			for (JobDetails detail : finishedJobs) {
+				generateSingleJobDetails(detail, gen);
+			}
+			gen.writeEndArray();
+			
+			gen.writeEndObject();
+
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			// this should not happen
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+	
+	private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen) throws Exception {
+		gen.writeStartObject();
+		
+		gen.writeStringField("jid", details.getJobId().toString());
+		gen.writeStringField("name", details.getJobName());
+		gen.writeStringField("state", details.getStatus().name());
+		gen.writeNumberField("start-time", details.getStartTime());
+		gen.writeNumberField("end-time", details.getEndTime());
+		gen.writeNumberField("last-modification", details.getLastUpdateTime());
+
+		gen.writeObjectFieldStart("tasks");
+		gen.writeNumberField("total", details.getNumTasks());
+
+		final int[] perState = details.getNumVerticesPerExecutionState();
+		gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] +
+				perState[ExecutionState.SCHEDULED.ordinal()] +
+										perState[ExecutionState.DEPLOYING.ordinal()]);
+		gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]);
+		gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]);
+		gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]);
+		gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]);
+		gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]);
+		gen.writeEndObject();
+
+		gen.writeEndObject();
+	}
+	
+	
+	
 	
 	public static String generateJobsOverviewJSON(JobsWithIDsOverview overview) {
 		try {
@@ -122,40 +222,6 @@ public class JsonFactory {
 		}
 	}
 	
-	public static String createJobSummaryJSON(JobID jid, String jobName, String state,
-												String start, String end, String duration,
-												int numOperators, int numOperatorsPending,
-												int numOperatorsRunning, int numOperatorsFinished,
-												int numOperatorsCanceling, int numOperatorsCanceled,
-												int numOperatorsFailed) {
-		try {
-			JSONObject json = new JSONObject();
-
-			json.put("jid", jid.toString());
-			json.put("name", jobName);
-			json.put("state", state);
-			json.put("start-time", start);
-			json.put("end-time", end);
-			json.put("duration", duration);
-			
-			JSONObject operators = new JSONObject();
-			operators.put("total", numOperators);
-			operators.put("pending", numOperatorsPending);
-			operators.put("running", numOperatorsRunning);
-			operators.put("finished", numOperatorsFinished);
-			operators.put("canceling", numOperatorsCanceling);
-			operators.put("canceled", numOperatorsCanceled);
-			operators.put("failed", numOperatorsFailed);
-			json.put("operators", operators);
-			
-			return json.toString(2);
-		}
-		catch (JSONException e) {
-			// this should not happen
-			throw new RuntimeException(e);
-		}
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
 	/** Don't instantiate */

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 5c7dc6b..85b2171 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobSummaryHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVerticesOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobsOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestJobIdsHandler;
@@ -57,7 +58,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * The root component of the web runtime monitor.
- *
+ * 
  * <p>The web runtime monitor is based in Netty HTTP. It uses the Netty-Router library to route
  * HTTP requests of different paths to different response handlers. In addition, it serves the static
  * files of the web frontend, such as HTML, CSS, or JS files.</p>
@@ -65,36 +66,34 @@ import java.util.concurrent.TimeUnit;
 public class WebRuntimeMonitor implements WebMonitor {
 
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
-
-	public static final long DEFAULT_REFRESH_INTERVAL = 5000;
-
+	
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
-
+	
 	/** Teh default path under which the static contents is stored */
 	private static final String STATIC_CONTENTS_PATH = "resources/web-runtime-monitor";
-
+	
 	// ------------------------------------------------------------------------
-
+	
 	private final Object startupShutdownLock = new Object();
-
+	
 	private final Router router;
 
 	private final int configuredPort;
 
 	private ServerBootstrap bootstrap;
-
+	
 	private Channel serverChannel;
 
-
+	
 	public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
-
+		
 		final WebMonitorConfig cfg = new WebMonitorConfig(config);
-
+		
 		// figure out where our static contents is
 		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
 		final String configuredWebRoot = cfg.getWebRoot();
-
+		
 		final File webRootDir;
 		if (configuredWebRoot != null) {
 			webRootDir = new File(configuredWebRoot);
@@ -103,48 +102,52 @@ public class WebRuntimeMonitor implements WebMonitor {
 			webRootDir = new File(flinkRoot, STATIC_CONTENTS_PATH);
 		}
 		else {
-			throw new IllegalConfigurationException("The given configuration provides neither the web-document root ("
+			throw new IllegalConfigurationException("The given configuration provides neither the web-document root (" 
 					+ WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY + "), not the Flink installation root ("
 					+ ConfigConstants.FLINK_BASE_DIR_PATH_KEY + ").");
 		}
-
+		
 		// validate that the doc root is a valid directory
 		if (!(webRootDir.exists() && webRootDir.isDirectory() && webRootDir.canRead())) {
-			throw new IllegalConfigurationException("The path to the static contents (" +
+			throw new IllegalConfigurationException("The path to the static contents (" + 
 					webRootDir.getAbsolutePath() + ") is not a readable directory.");
 		}
-
+		
 		// port configuration
 		this.configuredPort = cfg.getWebFrontendPort();
 		if (this.configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
-
+		
 		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
-
+		
 		router = new Router()
-				// config how to interact with this web server
-				.GET("/config", handler(new RequestConfigHandler(cfg.getRefreshInterval())))
+			// config how to interact with this web server
+			.GET("/config", handler(new RequestConfigHandler(cfg.getRefreshInterval())))
+			
+			// the overview - how many task managers, slots, free slots, ...
+			.GET("/overview", handler(new RequestOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
 
-						// the overview - how many task managers, slots, free slots, ...
-				.GET("/overview", handler(new RequestOverviewHandler(jobManager)))
+			// list of job ids for all jobs in each status
+			.GET("/jobids", handler(new RequestJobIdsHandler(jobManager, DEFAULT_REQUEST_TIMEOUT)))
 
-						// currently running jobs
-				.GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
-				.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
-				.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
-				.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
-				.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
+			// overview over jobs
+			.GET("/joboverview", handler(new JobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, true)))
+			.GET("/joboverview/running",handler(new JobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, true, false)))
+			.GET("/joboverview/completed", handler(new JobsOverviewHandler(jobManager, DEFAULT_REQUEST_TIMEOUT, false, true)))
 
-//			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
-
-						// the handler for the legacy requests
-				.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
-
-						// this handler serves all the static contents
-				.GET("/:*", new StaticFileServerHandler(webRootDir));
+			.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
+			.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
+			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
+			.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
 
+//			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
 
+			// the handler for the legacy requests
+			.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+					
+			// this handler serves all the static contents
+			.GET("/:*", new StaticFileServerHandler(webRootDir));
 	}
 
 	@Override
@@ -153,41 +156,41 @@ public class WebRuntimeMonitor implements WebMonitor {
 			if (this.bootstrap != null) {
 				throw new IllegalStateException("The server has already been started");
 			}
-
+			
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
-
+	
 				@Override
 				protected void initChannel(SocketChannel ch) {
 					Handler handler = new Handler(router);
-
+					
 					ch.pipeline()
-							.addLast(new HttpServerCodec())
-							.addLast(new HttpObjectAggregator(65536))
-							.addLast(new ChunkedWriteHandler())
-							.addLast(handler.name(), handler);
+						.addLast(new HttpServerCodec())
+						.addLast(new HttpObjectAggregator(65536))
+						.addLast(new ChunkedWriteHandler())
+						.addLast(handler.name(), handler);
 				}
 			};
-
+			
 			NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
 			NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-
+	
 			this.bootstrap = new ServerBootstrap();
 			this.bootstrap
 					.group(bossGroup, workerGroup)
 					.channel(NioServerSocketChannel.class)
 					.childHandler(initializer);
-
+	
 			Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
 			this.serverChannel = ch;
-
+			
 			InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
 			String address = bindAddress.getAddress().getHostAddress();
 			int port = bindAddress.getPort();
-
+			
 			LOG.info("Web frontend listening at " + address + ':' + port);
 		}
 	}
-
+	
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
@@ -203,7 +206,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			}
 		}
 	}
-
+	
 	@Override
 	public int getServerPort() {
 		Channel server = this.serverChannel;
@@ -215,15 +218,15 @@ public class WebRuntimeMonitor implements WebMonitor {
 				LOG.error("Cannot access local server port", e);
 			}
 		}
-
+			
 		return -1;
 	}
-
+	
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-
+	
 	private static RuntimeMonitorHandler handler(RequestHandler handler) {
 		return new RuntimeMonitorHandler(handler);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
index 9c3b45d..4efd029 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/MimeTypes.java
@@ -22,10 +22,10 @@ import java.util.HashMap;
 
 /**
  * Simple utility class that resolves file extensions to MIME types.
- * 
- * <p>There are various solutions built into Java that depend on extra resource and configuration
+ * <p>
+ * There are various solutions built into Java that depend on extra resource and configuration
  * files. They are designed to be composable and extensible, but also unfortunately tricky to control.
- * This is meant to be a simple solution that may eventually be subsumed by a better one.</p>
+ * This is meant to be a simple solution that may eventually be subsumed by a better one.
  */
 public class MimeTypes {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
index 2dfe4de..a682fd0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobSummaryHandler.java
@@ -18,24 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.Map;
 
 /**
  * Request handler that returns a summary of the job status.
  */
 public class JobSummaryHandler extends AbstractExecutionGraphRequestHandler implements RequestHandler.JsonResponse {
-
-	private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 	
 	public JobSummaryHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
@@ -43,61 +37,7 @@ public class JobSummaryHandler extends AbstractExecutionGraphRequestHandler impl
 
 	@Override
 	public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
-
-		JobID jid = graph.getJobID();
-		String name = graph.getJobName();
-		
-		long startTime = graph.getStatusTimestamp(JobStatus.CREATED);
-		long endTime = graph.getState().isTerminalState() ?
-				graph.getStatusTimestamp(graph.getState()) : -1;
-		
-		long duration = endTime == -1 ? System.currentTimeMillis() - startTime :
-				endTime - startTime;
-		
-		String startTimeString;
-		String endTimeTimeString;
-		String durationString = duration + " msecs";
-		
-		synchronized (dateFormatter) {
-			startTimeString = dateFormatter.format(new Date(startTime));
-			endTimeTimeString =  endTime == -1 ? "(pending)" : dateFormatter.format(new Date(endTime));
-		}
-		
-		String status = graph.getState().name();
-		
-		int pending = 0;
-		int running = 0;
-		int finished = 0;
-		int canceling = 0;
-		int canceled = 0;
-		int failed = 0;
-		
-		for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
-			ExecutionState aggState = vertex.getAggregateState();
-			switch (aggState) {
-				case FINISHED:
-					finished++;
-					break;
-				case FAILED:
-					failed++;
-					break;
-				case CANCELED:
-					canceled++;
-					break;
-				case RUNNING:
-					running++;
-					break;
-				case CANCELING:
-					canceling++;
-					break;
-				default:
-					pending++;
-			}
-		}
-		
-		int total = pending + running + finished + canceling + canceled + failed;
-		
-		return JsonFactory.createJobSummaryJSON(jid, name, status, startTimeString, endTimeTimeString, durationString, 
-				total, pending, running, finished, canceling, canceled, failed);
+		JobDetails details = WebMonitorUtils.createDetailsForJob(graph);
+		return JsonFactory.generateJobDetailsJSON(details);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobsOverviewHandler.java
new file mode 100644
index 0000000..304c521
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobsOverviewHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.JsonFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class JobsOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
+	
+	private final ActorGateway jobManager;
+	
+	private final FiniteDuration timeout;
+	
+	private final boolean includeRunningJobs;
+	private final boolean includeFinishedJobs;
+
+	
+	public JobsOverviewHandler(ActorGateway jobManager, FiniteDuration timeout,
+								boolean includeRunningJobs, boolean includeFinishedJobs) {
+		this.jobManager = jobManager;
+		this.timeout = timeout;
+		this.includeRunningJobs = includeRunningJobs;
+		this.includeFinishedJobs = includeFinishedJobs;
+	}
+
+	@Override
+	public String handleRequest(Map<String, String> params) throws Exception {
+		try {
+			Future<Object> future = jobManager.ask(
+					new RequestJobDetails(includeRunningJobs, includeFinishedJobs), timeout);
+			
+			MultipleJobsDetails result = (MultipleJobsDetails) Await.result(future, timeout);
+			
+			if (includeRunningJobs && includeFinishedJobs) {
+				return JsonFactory.generateRunningAndFinishedJobDetailsJSON(
+						result.getRunningJobs(), result.getFinishedJobs());
+			}
+			else {
+				return JsonFactory.generateMultipleJobsDetailsJSON(
+						includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs());
+			}
+		}
+		catch (Exception e) {
+			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index 7268945..377e86a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
-import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -35,16 +34,12 @@ import java.util.Map;
  * TaskManagers are currently connected, and how many jobs are running.
  */
 public class RequestOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
-
+	
 	private final ActorGateway jobManager;
-
+	
 	private final FiniteDuration timeout;
-
-
-	public RequestOverviewHandler(ActorGateway jobManager) {
-		this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
-	}
-
+	
+	
 	public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
 		if (jobManager == null || timeout == null) {
 			throw new NullPointerException();
@@ -52,7 +47,7 @@ public class RequestOverviewHandler implements  RequestHandler, RequestHandler.J
 		this.jobManager = jobManager;
 		this.timeout = timeout;
 	}
-
+	
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		try {
@@ -64,4 +59,4 @@ public class RequestOverviewHandler implements  RequestHandler, RequestHandler.J
 			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
deleted file mode 100644
index 5cc7273..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TextResponder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import java.util.Map;
-
-/**
- * Simple placeholder responder with a JSON object holding one key/value pair 
- */
-public class TextResponder implements RequestHandler, RequestHandler.TextResponse {
-	
-	private final String message;
-	
-	public TextResponder(String message) {
-		this.message = message == null ? "" : message;
-	}
-	
-	@Override
-	public String handleRequest(Map<String, String> params) throws Exception {
-		return message;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1d8a37c..47175a9 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -280,6 +280,14 @@ public class ExecutionGraph implements Serializable {
 	//  Configuration of Data-flow wide execution settings  
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Gets the number of job vertices currently held by this execution graph.
+	 * @return The current number of job vertices.
+	 */
+	public int getNumberOfExecutionJobVertices() {
+		return this.verticesInCreationOrder.size();
+	}
+	
 	public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
 		if (numberOfRetriesLeft < -1) {
 			throw new IllegalArgumentException();

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
new file mode 100644
index 0000000..c5f0c15
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An actor message with a detailed overview of the current status of a job.
+ */
+public class JobDetails implements java.io.Serializable {
+
+	private static final long serialVersionUID = -3391462110304948766L;
+	
+	private final JobID jobId;
+	
+	private final String jobName;
+	
+	private final long startTime;
+	
+	private final long endTime;
+	
+	private final JobStatus status;
+	
+	private final long lastUpdateTime;
+
+	private final int[] numVerticesPerExecutionState;
+	
+	private final int numTasks;
+
+	
+	public JobDetails(JobID jobId, String jobName,
+						long startTime, long endTime,
+						JobStatus status,
+						long lastUpdateTime,
+						int[] numVerticesPerExecutionState, int numTasks) {
+		
+		this.jobId = checkNotNull(jobId);
+		this.jobName = checkNotNull(jobName);
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.status = checkNotNull(status);
+		this.lastUpdateTime = lastUpdateTime;
+		this.numVerticesPerExecutionState = checkNotNull(numVerticesPerExecutionState);
+		this.numTasks = numTasks;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public long getEndTime() {
+		return endTime;
+	}
+
+	public JobStatus getStatus() {
+		return status;
+	}
+
+	public long getLastUpdateTime() {
+		return lastUpdateTime;
+	}
+
+	public int getNumTasks() {
+		return numTasks;
+	}
+
+	public int[] getNumVerticesPerExecutionState() {
+		return numVerticesPerExecutionState;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o != null && o.getClass() == JobDetails.class) {
+			JobDetails that = (JobDetails) o;
+
+			return this.endTime == that.endTime &&
+					this.lastUpdateTime == that.lastUpdateTime &&
+					this.numTasks == that.numTasks &&
+					this.startTime == that.startTime &&
+					this.status == that.status &&
+					this.jobId.equals(that.jobId) &&
+					this.jobName.equals(that.jobName) &&
+					Arrays.equals(this.numVerticesPerExecutionState, that.numVerticesPerExecutionState);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = jobId.hashCode();
+		result = 31 * result + jobName.hashCode();
+		result = 31 * result + (int) (startTime ^ (startTime >>> 32));
+		result = 31 * result + (int) (endTime ^ (endTime >>> 32));
+		result = 31 * result + status.hashCode();
+		result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32));
+		result = 31 * result + Arrays.hashCode(numVerticesPerExecutionState);
+		result = 31 * result + numTasks;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "JobDetails {" +
+				"jobId=" + jobId +
+				", jobName='" + jobName + '\'' +
+				", startTime=" + startTime +
+				", endTime=" + endTime +
+				", status=" + status +
+				", lastUpdateTime=" + lastUpdateTime +
+				", numVerticesPerExecutionState=" + Arrays.toString(numVerticesPerExecutionState) +
+				", numTasks=" + numTasks +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
index a261c2a..de24c15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import java.util.ArrayList;
 import java.util.List;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * An overview of how many jobs are in which status.
  */
@@ -38,10 +40,10 @@ public class JobsWithIDsOverview implements InfoMessage {
 	public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished, 
 								List<JobID> jobsCancelled, List<JobID> jobsFailed) {
 		
-		this.jobsRunningOrPending = jobsRunningOrPending;
-		this.jobsFinished = jobsFinished;
-		this.jobsCancelled = jobsCancelled;
-		this.jobsFailed = jobsFailed;
+		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
+		this.jobsFinished = checkNotNull(jobsFinished);
+		this.jobsCancelled = checkNotNull(jobsCancelled);
+		this.jobsFailed = checkNotNull(jobsFailed);
 	}
 
 	public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
@@ -69,6 +71,32 @@ public class JobsWithIDsOverview implements InfoMessage {
 	
 	// ------------------------------------------------------------------------
 
+
+	@Override
+	public int hashCode() {
+		return jobsRunningOrPending.hashCode() ^
+				jobsFinished.hashCode() ^
+				jobsCancelled.hashCode() ^
+				jobsFailed.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof JobsWithIDsOverview) {
+			JobsWithIDsOverview that = (JobsWithIDsOverview) obj;
+			return this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
+					this.jobsFinished.equals(that.jobsFinished) &&
+					this.jobsCancelled.equals(that.jobsCancelled) &&
+					this.jobsFailed.equals(that.jobsFailed);
+		}
+		else {
+			return false;
+		}
+	}
+
 	@Override
 	public String toString() {
 		return "JobsOverview {" +
@@ -82,6 +110,8 @@ public class JobsWithIDsOverview implements InfoMessage {
 	// ------------------------------------------------------------------------
 
 	private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
+		checkNotNull(first);
+		checkNotNull(second);
 		ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
 		result.addAll(first);
 		result.addAll(second);

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
new file mode 100644
index 0000000..47de58a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import java.util.Arrays;
+
+/**
+ * An actor messages describing details of various jobs. This message is sent for example
+ * in response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestJobDetails}
+ * message.
+ */
+public class MultipleJobsDetails implements java.io.Serializable {
+
+	private static final long serialVersionUID = -1526236139616019127L;
+	
+	private static final JobDetails[] EMPTY = new JobDetails[0];
+	
+	private final JobDetails[] runningJobs;
+	private final JobDetails[] finishedJobs;
+
+	public MultipleJobsDetails(JobDetails[] running, JobDetails[] finished) {
+		this.runningJobs = running == null ? EMPTY : running;
+		this.finishedJobs = finished == null ? EMPTY : finished;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	public JobDetails[] getRunningJobs() {
+		return runningJobs;
+	}
+
+	public JobDetails[] getFinishedJobs() {
+		return finishedJobs;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return Arrays.deepHashCode(runningJobs) + Arrays.deepHashCode(finishedJobs);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof MultipleJobsDetails) {
+			MultipleJobsDetails that = (MultipleJobsDetails) obj;
+			return Arrays.deepEquals(this.runningJobs, that.runningJobs) &&
+					Arrays.deepEquals(this.finishedJobs, that.finishedJobs);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "MultipleJobsDetails {" +
+				"running=" + Arrays.toString(runningJobs) +
+				", finished=" + Arrays.toString(finishedJobs) +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobDetails.java
new file mode 100644
index 0000000..91025bf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/RequestJobDetails.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+/**
+ * This message requests an overview of the jobs on the JobManager,
+ * including running jobs and/or finished jobs.
+ * <p>
+ * The response to this message is a
+ * {@link org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails} message.
+ */
+public class RequestJobDetails implements InfoMessage {
+
+	private static final long serialVersionUID = 5208137000412166747L;
+	
+	private final boolean includeRunning;
+	private final boolean includeFinished;
+
+	public RequestJobDetails(boolean includeRunning, boolean includeFinished) {
+		this.includeRunning = includeRunning;
+		this.includeFinished = includeFinished;
+	}
+	
+	// ------------------------------------------------------------------------
+
+	public boolean shouldIncludeFinished() {
+		return includeFinished;
+	}
+
+	public boolean shouldIncludeRunning() {
+		return includeRunning;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o instanceof RequestJobDetails) {
+			RequestJobDetails that = (RequestJobDetails) o;
+
+			return this.includeFinished == that.includeFinished &&
+					this.includeRunning == that.includeRunning;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return (includeRunning ? 31 : 0) + (includeFinished ? 1 : 0);
+	}
+
+	@Override
+	public String toString() {
+		return "RequestJobDetails{" +
+				"includeRunning=" + includeRunning +
+				", includeFinished=" + includeFinished +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
new file mode 100644
index 0000000..aa784e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+
+/**
+ * Utilities for the web runtime monitor. This class contains for example methods to build
+ * messages with aggregate information about the state of an execution graph, to be send
+ * to the web server.
+ */
+public class WebMonitorUtils {
+
+	public static JobDetails createDetailsForJob(ExecutionGraph job) {
+		JobStatus status = job.getState();
+		
+		long started = job.getStatusTimestamp(JobStatus.CREATED);
+		long finished = status.isTerminalState() ? job.getStatusTimestamp(status) : -1L;
+		
+		int[] countsPerStatus = new int[ExecutionState.values().length];
+		long lastChanged = 0;
+		int numTotalTasks = 0;
+		
+		for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
+			ExecutionVertex[] vertices = ejv.getTaskVertices();
+			numTotalTasks += vertices.length;
+			
+			for (ExecutionVertex vertex : vertices) {
+				ExecutionState state = vertex.getExecutionState();
+				countsPerStatus[state.ordinal()]++;
+				lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
+			}
+		}
+		
+		return new JobDetails(job.getJobID(), job.getJobName(),
+				started, finished, status, lastChanged,  
+				countsPerStatus, numTotalTasks);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 64ed129..d93b2ed 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
-import org.apache.flink.runtime.webmonitor.WebMonitor
+import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
 import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter}
 import org.apache.flink.runtime.LogMessages
 import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
@@ -921,6 +921,26 @@ class JobManager(
                 ourJobs, archiveOverview)
           }(context.dispatcher)
 
+        case msg : RequestJobDetails => 
+          
+          val ourDetails: Array[JobDetails] = if (msg.shouldIncludeRunning()) {
+            currentJobs.values.map {
+              v => WebMonitorUtils.createDetailsForJob(v._1)
+            }.toArray[JobDetails]
+          } else {
+            null
+          }
+          
+          if (msg.shouldIncludeFinished()) {
+            val future = (archive ? msg)(timeout)
+            future.onSuccess {
+              case archiveDetails: MultipleJobsDetails =>
+                theSender ! new MultipleJobsDetails(ourDetails, archiveDetails.getFinishedJobs())
+            }(context.dispatcher)
+          } else {
+            theSender ! new MultipleJobsDetails(ourDetails, null)
+          }
+          
         case _ => log.error("Unrecognized info message " + actorMessage)
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 702e34b..55a7ccb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.jobmanager
 
 import java.util
 
+import akka.actor.ActorRef
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
@@ -64,12 +66,12 @@ class MemoryArchivist(private val max_entries: Int)
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
+  protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
 
   /* Counters for finished, canceled, and failed jobs */
-  var finishedCnt: Int = 0
-  var canceledCnt: Int = 0
-  var failedCnt: Int = 0
+  private[this] var finishedCnt: Int = 0
+  private[this] var canceledCnt: Int = 0
+  private[this] var failedCnt: Int = 0
 
   override def preStart(): Unit = {
     log.info(s"Started memory archivist ${self.path}")
@@ -93,6 +95,8 @@ class MemoryArchivist(private val max_entries: Int)
 
       trimHistory()
 
+    case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())
+      
     case RequestArchivedJob(jobID: JobID) =>
       val graph = graphs.get(jobID)
       sender ! decorateMessage(ArchivedJob(graph))
@@ -115,23 +119,6 @@ class MemoryArchivist(private val max_entries: Int)
     case RequestJobCounts =>
       sender ! decorateMessage((finishedCnt, canceledCnt, failedCnt))
 
-    case _ : RequestJobsOverview =>
-      try {
-        sender ! createJobsOverview()
-      }
-      catch {
-        case t: Throwable => log.error("Exception while creating the jobs overview", t)
-      }
-
-    case _ : RequestJobsWithIDsOverview =>
-      try {
-        sender ! createJobsWithIDsOverview()
-      }
-      catch {
-        case t: Throwable => log.error("Exception while creating the jobs overview", t)
-      }
-
-
     case RequestAccumulatorResults(jobID) =>
       try {
         graphs.get(jobID) match {
@@ -165,6 +152,33 @@ class MemoryArchivist(private val max_entries: Int)
     throw new RuntimeException("Received unknown message " + message)
   }
 
+  
+  private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = {
+    message match {
+      case _ : RequestJobsOverview =>
+        try {
+          sender ! createJobsOverview()
+        }
+        catch {
+          case t: Throwable => log.error("Exception while creating the jobs overview", t)
+        }
+  
+      case _ : RequestJobsWithIDsOverview =>
+        try {
+          sender ! createJobsWithIDsOverview()
+        }
+        catch {
+          case t: Throwable => log.error("Exception while creating the jobs overview", t)
+        }
+
+      case _ : RequestJobDetails =>
+        val details = graphs.values.map {
+          v => WebMonitorUtils.createDetailsForJob(v)
+        }.toArray[JobDetails]
+        
+        theSender ! new MultipleJobsDetails(null, details)
+    }
+  }
 
   // --------------------------------------------------------------------------
   //  Request Responses

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
new file mode 100644
index 0000000..dcf4839
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/GenericMessageTester.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.ParameterizedType;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class GenericMessageTester {
+
+	public static void testMessageInstance(Serializable instance) throws Exception {
+		Serializable copy = CommonTestUtils.createCopySerializable(instance);
+		
+		// test equals, hash code, toString
+		assertTrue(instance.equals(copy));
+		assertTrue(copy.equals(instance));
+		assertTrue(instance.hashCode() == copy.hashCode());
+		assertTrue(instance.toString().equals(copy.toString()));
+	}
+	
+	public static void testMessageInstances(Serializable instance1, Serializable instance2) throws Exception {
+		// test equals, hash code, toString
+		assertTrue(instance1.equals(instance2));
+		assertTrue(instance2.equals(instance1));
+		assertTrue(instance1.hashCode() == instance2.hashCode());
+		assertTrue(instance1.toString().equals(instance2.toString()));
+
+		// test serializability
+		Serializable copy = CommonTestUtils.createCopySerializable(instance1);
+		assertTrue(instance1.equals(copy));
+		assertTrue(copy.equals(instance1));
+		assertTrue(instance1.hashCode() == copy.hashCode());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Random Generators
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	public static <T> T instantiateGeneric(Class<T> messageClass, Random rnd, Instantiator<?>... extraInstantiators) {
+		try {
+			// build the map of extra instantiators
+			Map<Class<?>, Instantiator<?>> extraInsts = new HashMap<>();
+			for (Instantiator<?> inst : extraInstantiators) {
+				Class<?> type = (Class<?>) ((ParameterizedType) inst.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
+				assertNotNull("Cannot get type for extra instantiator", type);
+				extraInsts.put(type, inst);
+			}
+
+			Constructor<?>[] constructors = messageClass.getConstructors();
+
+			Class<?> missingType = null;
+			
+			outer:
+			for (Constructor<?> constructor : constructors) {
+				
+				Class<?>[] paramTypes = constructor.getParameterTypes();
+				Object[] params = new Object[paramTypes.length];
+
+				for (int i = 0; i < paramTypes.length; i++) {
+					Instantiator<?> inst = extraInsts.get(paramTypes[i]);
+					if (inst == null) {
+						inst = INSTANTIATORS.get(paramTypes[i]);
+					}
+					
+					if (inst == null) {
+						missingType = paramTypes[i];
+						continue outer;
+					}
+					
+					params[i] = inst.instantiate(rnd);
+				}
+
+				return (T) constructor.newInstance(params);
+			}
+
+			//noinspection ConstantConditions
+			fail("No instantiator available for type " + missingType.getCanonicalName());
+			throw new RuntimeException();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Could not perform reflective tests: " + e.getMessage());
+			throw new RuntimeException();
+		}
+	}
+	
+	public static String randomString(Random rnd) {
+		int len = rnd.nextInt(64 + 1);
+		char[] chars = new char[len];
+		for (int i = 0; i < len; i++) {
+			chars[i] = (char) rnd.nextInt();
+		}
+		return new String(chars);
+	}
+	
+	public static JobID randomJobId(Random rnd) {
+		return new JobID(rnd.nextLong(), rnd.nextLong());
+	}
+	
+	public static JobStatus randomJobStatus(Random rnd) {
+		return JobStatus.values()[rnd.nextInt(JobStatus.values().length)];
+	}
+
+	// ------------------------------------------------------------------------
+	//  Map of Instantiators
+	// ------------------------------------------------------------------------
+	
+	private static final Map<Class<?>, Instantiator<?>> INSTANTIATORS = new HashMap<>();
+	
+	static {
+		INSTANTIATORS.put(boolean.class, new BooleanInstantiator());
+		INSTANTIATORS.put(Boolean.class, new BooleanInstantiator());
+
+		INSTANTIATORS.put(char.class, new CharInstantiator());
+		INSTANTIATORS.put(Character.class, new CharInstantiator());
+
+		INSTANTIATORS.put(byte.class, new ByteInstantiator());
+		INSTANTIATORS.put(Byte.class, new ByteInstantiator());
+
+		INSTANTIATORS.put(short.class, new ShortInstantiator());
+		INSTANTIATORS.put(Short.class, new ShortInstantiator());
+
+		INSTANTIATORS.put(int.class, new IntInstantiator());
+		INSTANTIATORS.put(Integer.class, new IntInstantiator());
+
+		INSTANTIATORS.put(long.class, new LongInstantiator());
+		INSTANTIATORS.put(Long.class, new LongInstantiator());
+
+		INSTANTIATORS.put(float.class, new FloatInstantiator());
+		INSTANTIATORS.put(Float.class, new FloatInstantiator());
+
+		INSTANTIATORS.put(double.class, new DoubleInstantiator());
+		INSTANTIATORS.put(Double.class, new DoubleInstantiator());
+
+		INSTANTIATORS.put(String.class, new StringInstantiator());
+
+		INSTANTIATORS.put(JobID.class, new JobIdInstantiator());
+		INSTANTIATORS.put(JobStatus.class, new JobStatusInstantiator());
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Instantiators
+	// ------------------------------------------------------------------------
+	
+	public static interface Instantiator<T> {
+		
+		T instantiate(Random rnd);
+	}
+
+	public static class ByteInstantiator implements Instantiator<Byte> {
+
+		@Override
+		public Byte instantiate(Random rnd) {
+			int i = rnd.nextInt(100);
+			return (byte) i;
+		}
+	}
+
+	public static class ShortInstantiator implements Instantiator<Short> {
+
+		@Override
+		public Short instantiate(Random rnd) {
+			return (short) rnd.nextInt(30000);
+		}
+	}
+
+	public static class IntInstantiator implements Instantiator<Integer> {
+
+		@Override
+		public Integer instantiate(Random rnd) {
+			return rnd.nextInt(Integer.MAX_VALUE);
+		}
+	}
+
+	public static class LongInstantiator implements Instantiator<Long> {
+
+		@Override
+		public Long instantiate(Random rnd) {
+			return (long) rnd.nextInt(Integer.MAX_VALUE);
+		}
+	}
+
+	public static class FloatInstantiator implements Instantiator<Float> {
+
+		@Override
+		public Float instantiate(Random rnd) {
+			return rnd.nextFloat();
+		}
+	}
+
+	public static class DoubleInstantiator implements Instantiator<Double> {
+
+		@Override
+		public Double instantiate(Random rnd) {
+			return rnd.nextDouble();
+		}
+	}
+
+	public static class BooleanInstantiator implements Instantiator<Boolean> {
+
+		@Override
+		public Boolean instantiate(Random rnd) {
+			return rnd.nextBoolean();
+		}
+	}
+
+	public static class CharInstantiator implements Instantiator<Character> {
+
+		@Override
+		public Character instantiate(Random rnd) {
+			return (char) rnd.nextInt(30000);
+		}
+	}
+
+	public static class StringInstantiator implements Instantiator<String> {
+
+		@Override
+		public String instantiate(Random rnd) {
+			return randomString(rnd);
+		}
+	}
+
+	public static class JobIdInstantiator implements Instantiator<JobID> {
+
+		@Override
+		public JobID instantiate(Random rnd) {
+			return randomJobId(rnd);
+		}
+	}
+
+	public static class JobStatusInstantiator implements Instantiator<JobStatus> {
+
+		@Override
+		public JobStatus instantiate(Random rnd) {
+			return randomJobStatus(rnd);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
new file mode 100644
index 0000000..b70853e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class WebMonitorMessagesTest {
+	
+	@Test
+	public void testStatusMessages() {
+		try {
+			final Random rnd = new Random();
+			
+			GenericMessageTester.testMessageInstance(RequestJobsOverview.getInstance());
+			GenericMessageTester.testMessageInstance(RequestJobsWithIDsOverview.getInstance());
+			GenericMessageTester.testMessageInstance(RequestStatusOverview.getInstance());
+
+			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(RequestJobsOverview.class, rnd));
+			
+			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(StatusOverview.class, rnd));
+			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
+			
+			GenericMessageTester.testMessageInstance(new JobsWithIDsOverview(
+					randomIds(rnd), randomIds(rnd), randomIds(rnd), randomIds(rnd)));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJobDetailsMessage() {
+		try {
+			final Random rnd = new Random();
+			
+			int[] numVerticesPerState = new int[ExecutionState.values().length];
+			int numTotal = 0;
+
+			for (int i = 0; i < numVerticesPerState.length; i++) {
+				int count = rnd.nextInt(55);
+				numVerticesPerState[i] = count;
+				numTotal += count;
+			}
+
+			long time = rnd.nextLong();
+			long endTime = rnd.nextBoolean() ? -1L : time + rnd.nextInt();
+			long lastModified = endTime == -1 ? time + rnd.nextInt() : endTime;
+
+			String name = GenericMessageTester.randomString(rnd);
+			JobID jid = GenericMessageTester.randomJobId(rnd);
+			JobStatus status = GenericMessageTester.randomJobStatus(rnd);
+			
+			JobDetails msg1 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
+			JobDetails msg2 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
+			
+			GenericMessageTester.testMessageInstances(msg1, msg2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMultipleJobDetails() {
+		try {
+			final Random rnd = new Random();
+			GenericMessageTester.testMessageInstance(
+					new MultipleJobsDetails(randomJobDetails(rnd), randomJobDetails(rnd)));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static List<JobID> randomIds(Random rnd) {
+		final int num = rnd.nextInt(20);
+		ArrayList<JobID> ids = new ArrayList<>(num);
+		
+		for (int i = 0; i < num; i++) {
+			ids.add(new JobID(rnd.nextLong(), rnd.nextLong()));
+		}
+		
+		return ids;
+	}
+	
+	private JobDetails[] randomJobDetails(Random rnd) {
+		final JobDetails[] details = new JobDetails[rnd.nextInt(10)];
+		for (int k = 0; k < details.length; k++) {
+			int[] numVerticesPerState = new int[ExecutionState.values().length];
+			int numTotal = 0;
+
+			for (int i = 0; i < numVerticesPerState.length; i++) {
+				int count = rnd.nextInt(55);
+				numVerticesPerState[i] = count;
+				numTotal += count;
+			}
+
+			long time = rnd.nextLong();
+			long endTime = rnd.nextBoolean() ? -1L : time + rnd.nextInt();
+			long lastModified = endTime == -1 ? time + rnd.nextInt() : endTime;
+
+			String name = new GenericMessageTester.StringInstantiator().instantiate(rnd);
+			JobID jid = new JobID();
+			JobStatus status = JobStatus.values()[rnd.nextInt(JobStatus.values().length)];
+
+			details[k] = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
+		}
+		return details;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/628e5f71/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 2ccddfa..167afbf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import org.apache.flink.runtime.{FlinkActor}
+import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}