You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/25 21:47:25 UTC

flink git commit: [FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

Repository: flink
Updated Branches:
  refs/heads/master f1b2b83d6 -> e585aed8c


[FLINK-7638] [flip6] Port CurrentJobsOverviewHandler to new REST endpoint

Ports the CurrentJobsOverviewHandler to the new REST endpoint by letting it implement
the LegacyRestHandler interface. This commit changes the JobDetails JSON such that it
now contains the number of tasks for each ExecutionState, including SCHEDULED,
DEPLOYING, CREATED and RECONCILING. These state will now also be displayed in the
web frontend.

Change MultipleJobsDetails to store a Collection<JobDetails> instead of JobDetails[]

Use MultipleJobsDetails#FIELD_NAME_ for serialization in CurrentJobsOverviewHandler

This closes #4688.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e585aed8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e585aed8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e585aed8

Branch: refs/heads/master
Commit: e585aed8ce751d769b56054fc1ffd4be24350e91
Parents: f1b2b83
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 18 22:58:22 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 25 23:47:02 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  21 ++-
 .../runtime/dispatcher/DispatcherGateway.java   |   3 +
 .../dispatcher/DispatcherRestEndpoint.java      |  15 ++
 .../entrypoint/SessionClusterEntrypoint.java    |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   4 +
 .../flink/runtime/jobmaster/JobMaster.java      |   7 +
 .../runtime/jobmaster/JobMasterGateway.java     |   3 +
 .../runtime/messages/webmonitor/JobDetails.java | 158 ++++++++++++++++---
 .../webmonitor/MultipleJobsDetails.java         |  95 +++++++----
 .../handler/legacy/ClusterOverviewHandler.java  |   2 -
 .../legacy/CurrentJobsOverviewHandler.java      |  69 ++++----
 .../handler/legacy/metrics/MetricFetcher.java   |   4 +-
 .../CurrentJobsOverviewHandlerHeaders.java      |  70 ++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   7 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |   2 +-
 .../messages/WebMonitorMessagesTest.java        |  12 +-
 .../messages/webmonitor/JobDetailsTest.java     |  62 ++++++++
 .../webmonitor/MultipleJobsDetailsTest.java     |  86 ++++++++++
 .../legacy/CurrentJobsOverviewHandlerTest.java  |  17 +-
 .../legacy/metrics/MetricFetcherTest.java       |   2 +-
 21 files changed, 532 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 153ee53..8af3434 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -49,6 +51,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -235,7 +238,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	@Override
 	public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
-		// TODO: return proper list of running jobs
 		return CompletableFuture.completedFuture(jobManagerRunners.keySet());
 	}
 
@@ -258,6 +260,23 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				8));
 	}
 
+	@Override
+	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
+		final int numberJobsRunning = jobManagerRunners.size();
+
+		ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);
+
+		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
+			individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout));
+		}
+
+		CompletableFuture<Collection<JobDetails>> combinedJobDetails = FutureUtils.combineAll(individualJobDetails);
+
+		return combinedJobDetails.thenApply(
+			(Collection<JobDetails> jobDetails) ->
+				new MultipleJobsDetails(jobDetails, null));
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index ee5484e..6aaf0b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
@@ -56,4 +57,6 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 		@RpcTimeout Time timeout);
 
 	CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
+
+	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 6054a7d..dff5df8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,18 +20,21 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -93,6 +96,17 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 				executor,
 				restConfiguration.getRefreshInterval()));
 
+		LegacyRestHandlerAdapter<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			CurrentJobsOverviewHandlerHeaders.getInstance(),
+			new CurrentJobsOverviewHandler(
+				executor,
+				timeout,
+				true,
+				true));
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -110,6 +124,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 
 		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
+		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
 
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index e394854..c4da6db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -75,7 +75,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 			rpcService,
 			DispatcherGateway.class,
-			uuid -> new DispatcherId(uuid),
+			DispatcherId::new,
 			10,
 			Time.milliseconds(50L));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6f5a082..0bf0cc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -177,6 +177,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 		}
 	}
 
+	public JobMasterGateway getJobManagerGateway() {
+		return jobManager.getSelfGateway(JobMasterGateway.class);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 343fbf6..19fe4a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
@@ -90,6 +91,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -718,6 +720,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 	}
 
+	@Override
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 965d88d..c2fba47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -209,4 +210,6 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 * @param resourceID unique id of the resource manager
 	 */
 	void heartbeatFromResourceManager(final ResourceID resourceID);
+
+	CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 31ea516..2aca75b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -19,8 +19,21 @@
 package org.apache.flink.runtime.messages.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -28,40 +41,58 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * An actor message with a detailed overview of the current status of a job.
  */
-public class JobDetails implements java.io.Serializable {
+@JsonSerialize(using = JobDetails.JobDetailsSerializer.class)
+@JsonDeserialize(using = JobDetails.JobDetailsDeserializer.class)
+public class JobDetails implements Serializable {
 
 	private static final long serialVersionUID = -3391462110304948766L;
-	
+
+	private static final String FIELD_NAME_JOB_ID = "jid";
+	private static final String FIELD_NAME_JOB_NAME = "name";
+	private static final String FIELD_NAME_START_TIME = "start-time";
+	private static final String FIELD_NAME_END_TIME = "end-time";
+	private static final String FIELD_NAME_DURATION = "duration";
+	private static final String FIELD_NAME_STATUS = "state";
+	private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
+	private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+
 	private final JobID jobId;
-	
+
 	private final String jobName;
-	
+
 	private final long startTime;
-	
+
 	private final long endTime;
-	
+
+	private final long duration;
+
 	private final JobStatus status;
-	
+
 	private final long lastUpdateTime;
 
-	private final int[] numVerticesPerExecutionState;
+	private final int[] tasksPerState;
 	
 	private final int numTasks;
 
-	
-	public JobDetails(JobID jobId, String jobName,
-						long startTime, long endTime,
-						JobStatus status,
-						long lastUpdateTime,
-						int[] numVerticesPerExecutionState, int numTasks) {
-		
+	public JobDetails(
+			JobID jobId,
+			String jobName,
+			long startTime,
+			long endTime,
+			long duration,
+			JobStatus status,
+			long lastUpdateTime,
+			int[] tasksPerState,
+			int numTasks) {
+
 		this.jobId = checkNotNull(jobId);
 		this.jobName = checkNotNull(jobName);
 		this.startTime = startTime;
 		this.endTime = endTime;
+		this.duration = duration;
 		this.status = checkNotNull(status);
 		this.lastUpdateTime = lastUpdateTime;
-		this.numVerticesPerExecutionState = checkNotNull(numVerticesPerExecutionState);
+		this.tasksPerState = checkNotNull(tasksPerState);
 		this.numTasks = numTasks;
 	}
 	
@@ -83,6 +114,10 @@ public class JobDetails implements java.io.Serializable {
 		return endTime;
 	}
 
+	public long getDuration() {
+		return duration;
+	}
+
 	public JobStatus getStatus() {
 		return status;
 	}
@@ -95,8 +130,8 @@ public class JobDetails implements java.io.Serializable {
 		return numTasks;
 	}
 
-	public int[] getNumVerticesPerExecutionState() {
-		return numVerticesPerExecutionState;
+	public int[] getTasksPerState() {
+		return tasksPerState;
 	}
 
 	// ------------------------------------------------------------------------
@@ -116,7 +151,7 @@ public class JobDetails implements java.io.Serializable {
 					this.status == that.status &&
 					this.jobId.equals(that.jobId) &&
 					this.jobName.equals(that.jobName) &&
-					Arrays.equals(this.numVerticesPerExecutionState, that.numVerticesPerExecutionState);
+					Arrays.equals(this.tasksPerState, that.tasksPerState);
 		}
 		else {
 			return false;
@@ -131,7 +166,7 @@ public class JobDetails implements java.io.Serializable {
 		result = 31 * result + (int) (endTime ^ (endTime >>> 32));
 		result = 31 * result + status.hashCode();
 		result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32));
-		result = 31 * result + Arrays.hashCode(numVerticesPerExecutionState);
+		result = 31 * result + Arrays.hashCode(tasksPerState);
 		result = 31 * result + numTasks;
 		return result;
 	}
@@ -145,8 +180,89 @@ public class JobDetails implements java.io.Serializable {
 				", endTime=" + endTime +
 				", status=" + status +
 				", lastUpdateTime=" + lastUpdateTime +
-				", numVerticesPerExecutionState=" + Arrays.toString(numVerticesPerExecutionState) +
+				", numVerticesPerExecutionState=" + Arrays.toString(tasksPerState) +
 				", numTasks=" + numTasks +
 				'}';
 	}
+
+	public static final class JobDetailsSerializer extends StdSerializer<JobDetails> {
+		private static final long serialVersionUID = 7915913423515194428L;
+
+		public JobDetailsSerializer() {
+			super(JobDetails.class);
+		}
+
+		@Override
+		public void serialize(
+				JobDetails jobDetails,
+				JsonGenerator jsonGenerator,
+				SerializerProvider serializerProvider) throws IOException {
+			jsonGenerator.writeStartObject();
+
+			jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobDetails.getJobId().toString());
+			jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobDetails.getJobName());
+			jsonGenerator.writeStringField(FIELD_NAME_STATUS, jobDetails.getStatus().name());
+
+			jsonGenerator.writeNumberField(FIELD_NAME_START_TIME, jobDetails.getStartTime());
+			jsonGenerator.writeNumberField(FIELD_NAME_END_TIME, jobDetails.getEndTime());
+			jsonGenerator.writeNumberField(FIELD_NAME_DURATION, jobDetails.getDuration());
+			jsonGenerator.writeNumberField(FIELD_NAME_LAST_MODIFICATION, jobDetails.getLastUpdateTime());
+
+			jsonGenerator.writeObjectFieldStart("tasks");
+			jsonGenerator.writeNumberField(FIELD_NAME_TOTAL_NUMBER_TASKS, jobDetails.getNumTasks());
+
+			final int[] perState = jobDetails.getTasksPerState();
+
+			for (ExecutionState executionState : ExecutionState.values()) {
+				jsonGenerator.writeNumberField(executionState.name().toLowerCase(), perState[executionState.ordinal()]);
+			}
+
+			jsonGenerator.writeEndObject();
+
+			jsonGenerator.writeEndObject();
+		}
+	}
+
+	public static final class JobDetailsDeserializer extends StdDeserializer<JobDetails> {
+
+		private static final long serialVersionUID = 6089784742093294800L;
+
+		public JobDetailsDeserializer() {
+			super(JobDetails.class);
+		}
+
+		@Override
+		public JobDetails deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+
+			JsonNode rootNode = jsonParser.readValueAsTree();
+
+			JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).textValue());
+			String jobName = rootNode.get(FIELD_NAME_JOB_NAME).textValue();
+			long startTime = rootNode.get(FIELD_NAME_START_TIME).longValue();
+			long endTime = rootNode.get(FIELD_NAME_END_TIME).longValue();
+			long duration = rootNode.get(FIELD_NAME_DURATION).longValue();
+			JobStatus jobStatus = JobStatus.valueOf(rootNode.get(FIELD_NAME_STATUS).textValue());
+			long lastUpdateTime = rootNode.get(FIELD_NAME_LAST_MODIFICATION).longValue();
+
+			JsonNode tasksNode = rootNode.get("tasks");
+			int numTasks = tasksNode.get(FIELD_NAME_TOTAL_NUMBER_TASKS).intValue();
+
+			int[] numVerticesPerExecutionState = new int[ExecutionState.values().length];
+
+			for (ExecutionState executionState : ExecutionState.values()) {
+				numVerticesPerExecutionState[executionState.ordinal()] = tasksNode.get(executionState.name().toLowerCase()).intValue();
+			}
+
+			return new JobDetails(
+				jobId,
+				jobName,
+				startTime,
+				endTime,
+				duration,
+				jobStatus,
+				lastUpdateTime,
+				numVerticesPerExecutionState,
+				numTasks);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
index 47de58a..31eb1cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
@@ -18,64 +18,95 @@
 
 package org.apache.flink.runtime.messages.webmonitor;
 
-import java.util.Arrays;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 
 /**
  * An actor messages describing details of various jobs. This message is sent for example
- * in response to the {@link org.apache.flink.runtime.messages.webmonitor.RequestJobDetails}
- * message.
+ * in response to the {@link RequestJobDetails} message.
  */
-public class MultipleJobsDetails implements java.io.Serializable {
+public class MultipleJobsDetails implements ResponseBody, Serializable {
 
 	private static final long serialVersionUID = -1526236139616019127L;
 	
-	private static final JobDetails[] EMPTY = new JobDetails[0];
-	
-	private final JobDetails[] runningJobs;
-	private final JobDetails[] finishedJobs;
+	public static final String FIELD_NAME_JOBS_RUNNING = "running";
+	public static final String FIELD_NAME_JOBS_FINISHED = "finished";
 
-	public MultipleJobsDetails(JobDetails[] running, JobDetails[] finished) {
-		this.runningJobs = running == null ? EMPTY : running;
-		this.finishedJobs = finished == null ? EMPTY : finished;
+	@JsonProperty(FIELD_NAME_JOBS_RUNNING)
+	private final Collection<JobDetails> running;
+
+	@JsonProperty(FIELD_NAME_JOBS_FINISHED)
+	private final Collection<JobDetails> finished;
+
+	@JsonCreator
+	public MultipleJobsDetails(
+			@JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection<JobDetails> running,
+			@JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection<JobDetails> finished) {
+		this.running = running == null ? Collections.emptyList() : running;
+		this.finished = finished == null ? Collections.emptyList() : finished;
 	}
 	
 	// ------------------------------------------------------------------------
 
-	public JobDetails[] getRunningJobs() {
-		return runningJobs;
+	public Collection<JobDetails> getRunning() {
+		return running;
 	}
 
-	public JobDetails[] getFinishedJobs() {
-		return finishedJobs;
+	public Collection<JobDetails> getFinished() {
+		return finished;
 	}
 
-	// ------------------------------------------------------------------------
-
 	@Override
-	public int hashCode() {
-		return Arrays.deepHashCode(runningJobs) + Arrays.deepHashCode(finishedJobs);
+	public String toString() {
+		return "MultipleJobsDetails{" +
+			"running=" + running +
+			", finished=" + finished +
+			'}';
 	}
 
 	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
+	public boolean equals(Object o) {
+		if (this == o) {
 			return true;
 		}
-		else if (obj instanceof MultipleJobsDetails) {
-			MultipleJobsDetails that = (MultipleJobsDetails) obj;
-			return Arrays.deepEquals(this.runningJobs, that.runningJobs) &&
-					Arrays.deepEquals(this.finishedJobs, that.finishedJobs);
-		}
-		else {
+		if (o == null || getClass() != o.getClass()) {
 			return false;
 		}
+		MultipleJobsDetails that = (MultipleJobsDetails) o;
+
+		return CollectionUtils.isEqualCollection(running, that.running) &&
+			CollectionUtils.isEqualCollection(finished, that.finished);
 	}
 
 	@Override
-	public String toString() {
-		return "MultipleJobsDetails {" +
-				"running=" + Arrays.toString(runningJobs) +
-				", finished=" + Arrays.toString(finishedJobs) +
-				'}';
+	public int hashCode() {
+		// the hash code only depends on the collection elements, not the collection itself!
+		int result = 1;
+
+		Iterator<JobDetails> iterator = running.iterator();
+
+		while (iterator.hasNext()) {
+			JobDetails jobDetails = iterator.next();
+			result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
+		}
+
+		iterator = finished.iterator();
+
+		while (iterator.hasNext()) {
+			JobDetails jobDetails = iterator.next();
+			result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
+		}
+
+		return result;
 	}
+
+	// ------------------------------------------------------------------------
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 480c9e8..794ff20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -50,8 +50,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ClusterOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> {
 
-
-
 	private static final String version = EnvironmentInformation.getVersion();
 
 	private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
index 6f85320..b1939e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -21,11 +21,15 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -45,7 +49,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Request handler that returns a summary of the job status.
  */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> {
 
 	private static final String ALL_JOBS_REST_PATH = "/joboverview";
 	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
@@ -69,6 +73,11 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public CompletableFuture<MultipleJobsDetails> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
+		return gateway.requestJobDetails(timeout);
+	}
+
+	@Override
 	public String[] getPaths() {
 		if (includeRunningJobs && includeFinishedJobs) {
 			return new String[]{ALL_JOBS_REST_PATH};
@@ -92,24 +101,26 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 					StringWriter writer = new StringWriter();
 					try {
 						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+						final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+
 						gen.writeStartObject();
 
 						if (includeRunningJobs && includeFinishedJobs) {
-							gen.writeArrayFieldStart("running");
-							for (JobDetails detail : result.getRunningJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
+							gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
+							for (JobDetails detail : result.getRunning()) {
+								jobDetailsSerializer.serialize(detail, gen, null);
 							}
 							gen.writeEndArray();
 
-							gen.writeArrayFieldStart("finished");
-							for (JobDetails detail : result.getFinishedJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
+							gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+							for (JobDetails detail : result.getFinished()) {
+								jobDetailsSerializer.serialize(detail, gen, null);
 							}
 							gen.writeEndArray();
 						} else {
 							gen.writeArrayFieldStart("jobs");
-							for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-								writeJobDetailOverviewAsJson(detail, gen, now);
+							for (JobDetails detail : includeRunningJobs ? result.getRunning() : result.getFinished()) {
+								jobDetailsSerializer.serialize(detail, gen, null);
 							}
 							gen.writeEndArray();
 						}
@@ -138,10 +149,13 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 			StringWriter writer = new StringWriter();
 			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
 				gen.writeStartObject();
-				gen.writeArrayFieldStart("running");
+				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
 				gen.writeEndArray();
-				gen.writeArrayFieldStart("finished");
-				writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
+				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+
+				final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+				jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
+
 				gen.writeEndArray();
 				gen.writeEndObject();
 			}
@@ -150,33 +164,4 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 			return Collections.singleton(new ArchivedJson(path, json));
 		}
 	}
-
-	public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
-		gen.writeStartObject();
-
-		gen.writeStringField("jid", details.getJobId().toString());
-		gen.writeStringField("name", details.getJobName());
-		gen.writeStringField("state", details.getStatus().name());
-
-		gen.writeNumberField("start-time", details.getStartTime());
-		gen.writeNumberField("end-time", details.getEndTime());
-		gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
-		gen.writeNumberField("last-modification", details.getLastUpdateTime());
-
-		gen.writeObjectFieldStart("tasks");
-		gen.writeNumberField("total", details.getNumTasks());
-
-		final int[] perState = details.getNumVerticesPerExecutionState();
-		gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] +
-				perState[ExecutionState.SCHEDULED.ordinal()] +
-				perState[ExecutionState.DEPLOYING.ordinal()]);
-		gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]);
-		gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]);
-		gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]);
-		gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]);
-		gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]);
-		gen.writeEndObject();
-
-		gen.writeEndObject();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 9f53808..c114ee6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -113,10 +113,10 @@ public class MetricFetcher {
 							LOG.debug("Fetching of JobDetails failed.", throwable);
 						} else {
 							ArrayList<String> toRetain = new ArrayList<>();
-							for (JobDetails job : jobDetails.getRunningJobs()) {
+							for (JobDetails job : jobDetails.getRunning()) {
 								toRetain.add(job.getJobId().toString());
 							}
-							for (JobDetails job : jobDetails.getFinishedJobs()) {
+							for (JobDetails job : jobDetails.getFinished()) {
 								toRetain.add(job.getJobId().toString());
 							}
 							synchronized (metrics) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
new file mode 100644
index 0000000..f97c601
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link CurrentJobsOverviewHandler}.
+ */
+public final class CurrentJobsOverviewHandlerHeaders implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+	private static final CurrentJobsOverviewHandlerHeaders INSTANCE = new CurrentJobsOverviewHandlerHeaders();
+
+	// make this class a singleton
+	private CurrentJobsOverviewHandlerHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/joboverview";
+	}
+
+	@Override
+	public Class<MultipleJobsDetails> getResponseClass() {
+		return MultipleJobsDetails.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static CurrentJobsOverviewHandlerHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index e0f1823..0accab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -253,6 +253,7 @@ public final class WebMonitorUtils {
 
 		long started = job.getStatusTimestamp(JobStatus.CREATED);
 		long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
+		long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;
 
 		int[] countsPerStatus = new int[ExecutionState.values().length];
 		long lastChanged = 0;
@@ -271,9 +272,16 @@ public final class WebMonitorUtils {
 
 		lastChanged = Math.max(lastChanged, finished);
 
-		return new JobDetails(job.getJobID(), job.getJobName(),
-				started, finished, status, lastChanged,
-				countsPerStatus, numTotalTasks);
+		return new JobDetails(
+			job.getJobID(),
+			job.getJobName(),
+			started,
+			finished,
+			duration,
+			status,
+			lastChanged,
+			countsPerStatus,
+			numTotalTasks);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 67ffb32..76707b5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.IOException
 import java.net._
+import java.util
 import java.util.UUID
 import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
 import java.util.function.{BiFunction, Consumer}
@@ -1694,10 +1695,12 @@ class JobManager(
             val future = (archive ? msg)(timeout)
             future.onSuccess {
               case archiveDetails: MultipleJobsDetails =>
-                theSender ! new MultipleJobsDetails(ourDetails, archiveDetails.getFinishedJobs())
+                theSender ! new MultipleJobsDetails(
+                  util.Arrays.asList(ourDetails: _*),
+                  archiveDetails.getFinished())
             }(context.dispatcher)
           } else {
-            theSender ! new MultipleJobsDetails(ourDetails, null)
+            theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null)
           }
           
         case _ => log.error("Unrecognized info message " + actorMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 327e2a3..c963238 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -189,7 +189,7 @@ class MemoryArchivist(
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
 
-        theSender ! decorateMessage(new MultipleJobsDetails(null, details))
+        theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 8d281f7..f5f4976 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
@@ -85,8 +87,8 @@ public class WebMonitorMessagesTest {
 			JobID jid = GenericMessageTester.randomJobId(rnd);
 			JobStatus status = GenericMessageTester.randomJobStatus(rnd);
 			
-			JobDetails msg1 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
-			JobDetails msg2 = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
+			JobDetails msg1 = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
+			JobDetails msg2 = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
 			
 			GenericMessageTester.testMessageInstances(msg1, msg2);
 		}
@@ -120,7 +122,7 @@ public class WebMonitorMessagesTest {
 		return ids;
 	}
 	
-	private JobDetails[] randomJobDetails(Random rnd) {
+	private Collection<JobDetails> randomJobDetails(Random rnd) {
 		final JobDetails[] details = new JobDetails[rnd.nextInt(10)];
 		for (int k = 0; k < details.length; k++) {
 			int[] numVerticesPerState = new int[ExecutionState.values().length];
@@ -140,8 +142,8 @@ public class WebMonitorMessagesTest {
 			JobID jid = new JobID();
 			JobStatus status = JobStatus.values()[rnd.nextInt(JobStatus.values().length)];
 
-			details[k] = new JobDetails(jid, name, time, endTime, status, lastModified, numVerticesPerState, numTotal);
+			details[k] = new JobDetails(jid, name, time, endTime, endTime - time, status, lastModified, numVerticesPerState, numTotal);
 		}
-		return details;
+		return Arrays.asList(details);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
new file mode 100644
index 0000000..96a97a7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JobDetails}.
+ */
+public class JobDetailsTest extends TestLogger {
+
+	/**
+	 * Tests that we can marshal and unmarshal JobDetails instances.
+	 */
+	@Test
+	public void testJobDetailsMarshalling() throws JsonProcessingException {
+		final JobDetails expected = new JobDetails(
+			new JobID(),
+			"foobar",
+			1L,
+			10L,
+			9L,
+			JobStatus.RUNNING,
+			8L,
+			new int[]{1, 3, 3, 7, 4, 2, 7, 3, 3},
+			42);
+
+		final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+		final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class);
+
+		assertEquals(expected, unmarshalled);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
new file mode 100644
index 0000000..23f012a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link MultipleJobsDetails} class.
+ */
+public class MultipleJobsDetailsTest extends TestLogger {
+
+	/**
+	 * Tests that we can un/marshal {@link MultipleJobsDetails} objects.
+	 */
+	@Test
+	public void testMultipleJobsDetailsMarshalling() throws JsonProcessingException {
+		int[] verticesPerState = new int[ExecutionState.values().length];
+
+		for (int i = 0; i < verticesPerState.length; i++) {
+			verticesPerState[i] = i;
+		}
+
+		final JobDetails running = new JobDetails(
+			new JobID(),
+			"running",
+			1L,
+			-1L,
+			9L,
+			JobStatus.RUNNING,
+			9L,
+			verticesPerState,
+			9);
+
+		final JobDetails finished = new JobDetails(
+			new JobID(),
+			"finished",
+			1L,
+			5L,
+			4L,
+			JobStatus.FINISHED,
+			8L,
+			verticesPerState,
+			4);
+
+		final MultipleJobsDetails expected = new MultipleJobsDetails(
+			Collections.singleton(running),
+			Collections.singleton(finished));
+
+		final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		final JsonNode marshalled = objectMapper.valueToTree(expected);
+
+		final MultipleJobsDetails unmarshalled = objectMapper.treeToValue(marshalled, MultipleJobsDetails.class);
+
+		assertEquals(expected, unmarshalled);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
index 83bb157..c9326c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
@@ -89,7 +89,8 @@ public class CurrentJobsOverviewHandlerTest {
 		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
 		StringWriter writer = new StringWriter();
 		try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
-			CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
+			JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer();
+			serializer.serialize(expectedDetails, gen, null);
 		}
 		compareJobOverview(expectedDetails, writer.toString());
 	}
@@ -108,14 +109,10 @@ public class CurrentJobsOverviewHandlerTest {
 
 		JsonNode tasks = result.get("tasks");
 		Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
-		int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState();
-		Assert.assertEquals(
-			tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()],
-			tasks.get("pending").asInt());
-		Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt());
-		Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt());
-		Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt());
-		Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt());
-		Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt());
+		int[] tasksPerState = expectedDetails.getTasksPerState();
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e585aed8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index b278979..e513dd9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -91,7 +91,7 @@ public class MetricFetcherTest extends TestLogger {
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
 		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
 		when(jobManagerGateway.requestTaskManagerInstances(any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(Collections.singleton(taskManager)));
 		when(jobManagerGateway.getAddress()).thenReturn("/jm/address");