You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:17 UTC

[19/30] flink git commit: [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under /jobs/overview

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 08946ed..57b5dec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -223,7 +223,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
 				.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index cf3405b..51d7755 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -330,7 +330,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
-	public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
+	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
 		final int numberJobsRunning = jobManagerRunners.size();
 
 		ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8244d95..4de77b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -37,6 +36,7 @@ import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatis
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
-import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
@@ -56,7 +55,6 @@ import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -66,6 +64,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -162,17 +161,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 				executor,
 				restConfiguration.getRefreshInterval()));
 
-		LegacyRestHandlerAdapter<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> currentJobsOverviewHandler = new LegacyRestHandlerAdapter<>(
+		JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
-			CurrentJobsOverviewHandlerHeaders.getInstance(),
-			new CurrentJobsOverviewHandler(
-				executor,
-				timeout,
-				true,
-				true));
+			JobsOverviewHeaders.getInstance());
 
 		LegacyRestHandlerAdapter<DispatcherGateway, ClusterConfigurationInfo, EmptyMessageParameters> clusterConfigurationHandler = new LegacyRestHandlerAdapter<>(
 			restAddressFuture,
@@ -331,7 +325,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
 		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
-		handlers.add(Tuple2.of(CurrentJobsOverviewHandlerHeaders.getInstance(), currentJobsOverviewHandler));
+		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
new file mode 100644
index 0000000..46aa887
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Overview handler for jobs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobsOverviewHandler<T extends RestfulGateway> extends AbstractRestHandler<T, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+	public JobsOverviewHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> messageHeaders) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull T gateway) throws RestHandlerException {
+		return gateway.requestJobDetails(timeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 98e95c0..30e7fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -41,7 +42,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
-import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -63,7 +63,7 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
 
 	@Override
 	public String[] getPaths() {
-		return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+		return new String[]{ClusterOverviewHeaders.URL};
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
deleted file mode 100644
index 27edac8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns a summary of the job status.
- */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, MultipleJobsDetails, EmptyMessageParameters> {
-
-	private static final String ALL_JOBS_REST_PATH = "/joboverview";
-	private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
-	private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
-
-	private final Time timeout;
-
-	private final boolean includeRunningJobs;
-	private final boolean includeFinishedJobs;
-
-	public CurrentJobsOverviewHandler(
-			Executor executor,
-			Time timeout,
-			boolean includeRunningJobs,
-			boolean includeFinishedJobs) {
-
-		super(executor);
-		this.timeout = checkNotNull(timeout);
-		this.includeRunningJobs = includeRunningJobs;
-		this.includeFinishedJobs = includeFinishedJobs;
-	}
-
-	@Override
-	public CompletableFuture<MultipleJobsDetails> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
-		return gateway.requestJobDetails(true, true, timeout);
-	}
-
-	@Override
-	public String[] getPaths() {
-		if (includeRunningJobs && includeFinishedJobs) {
-			return new String[]{ALL_JOBS_REST_PATH};
-		}
-		if (includeRunningJobs) {
-			return new String[]{RUNNING_JOBS_REST_PATH};
-		} else {
-			return new String[]{COMPLETED_JOBS_REST_PATH};
-		}
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
-		if (jobManagerGateway != null) {
-			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-
-			return jobDetailsFuture.thenApplyAsync(
-				(MultipleJobsDetails result) -> {
-					final long now = System.currentTimeMillis();
-
-					StringWriter writer = new StringWriter();
-					try {
-						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-						final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
-
-						gen.writeStartObject();
-
-						if (includeRunningJobs && includeFinishedJobs) {
-							gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
-							for (JobDetails detail : result.getRunning()) {
-								jobDetailsSerializer.serialize(detail, gen, null);
-							}
-							gen.writeEndArray();
-
-							gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
-							for (JobDetails detail : result.getFinished()) {
-								jobDetailsSerializer.serialize(detail, gen, null);
-							}
-							gen.writeEndArray();
-						} else {
-							gen.writeArrayFieldStart("jobs");
-							for (JobDetails detail : includeRunningJobs ? result.getRunning() : result.getFinished()) {
-								jobDetailsSerializer.serialize(detail, gen, null);
-							}
-							gen.writeEndArray();
-						}
-
-						gen.writeEndObject();
-						gen.close();
-						return writer.toString();
-					} catch (IOException e) {
-						throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
-					}
-				},
-				executor);
-		}
-		else {
-			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
-		}
-	}
-
-	/**
-	 * Archivist for the CurrentJobsOverviewHandler.
-	 */
-	public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
-
-		@Override
-		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-			StringWriter writer = new StringWriter();
-			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
-				gen.writeStartObject();
-				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
-				gen.writeEndArray();
-				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
-
-				final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
-				jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
-
-				gen.writeEndArray();
-				gen.writeEndObject();
-			}
-			String json = writer.toString();
-			String path = ALL_JOBS_REST_PATH;
-			return Collections.singleton(new ArchivedJson(path, json));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
new file mode 100644
index 0000000..6aa75b2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class JobsOverviewHandler extends AbstractJsonRequestHandler {
+
+	private static final String ALL_JOBS_REST_PATH = JobsOverviewHeaders.URL;
+
+	private final Time timeout;
+
+	public JobsOverviewHandler(
+			Executor executor,
+			Time timeout) {
+
+		super(executor);
+		this.timeout = checkNotNull(timeout);
+	}
+
+	@Override
+	public String[] getPaths() {
+		return new String[]{ALL_JOBS_REST_PATH};
+	}
+
+	@Override
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(timeout);
+
+			return jobDetailsFuture.thenApplyAsync(
+				(MultipleJobsDetails result) -> {
+
+					StringWriter writer = new StringWriter();
+					try {
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+						final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+
+						gen.writeStartObject();
+
+						gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
+						for (JobDetails detail : result.getRunning()) {
+							jobDetailsSerializer.serialize(detail, gen, null);
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+						for (JobDetails detail : result.getFinished()) {
+							jobDetailsSerializer.serialize(detail, gen, null);
+						}
+						gen.writeEndArray();
+
+						gen.writeEndObject();
+						gen.close();
+						return writer.toString();
+					} catch (IOException e) {
+						throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
+					}
+				},
+				executor);
+		}
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+		}
+	}
+
+	/**
+	 * Archivist for the CurrentJobsOverviewHandler.
+	 */
+	public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
+
+		@Override
+		public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+			StringWriter writer = new StringWriter();
+			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
+				gen.writeEndArray();
+				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+
+				final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
+				jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
+
+				gen.writeEndArray();
+				gen.writeEndObject();
+			}
+			String json = writer.toString();
+			String path = ALL_JOBS_REST_PATH;
+			return Collections.singleton(new ArchivedJson(path, json));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index e71a1d7..e89ed55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -103,13 +103,10 @@ public class MetricFetcher<T extends RestfulGateway> {
 			if (optionalLeaderGateway.isPresent()) {
 				final T leaderGateway = optionalLeaderGateway.get();
 
-				/**
+				/*
 				 * Remove all metrics that belong to a job that is not running and no longer archived.
 				 */
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(
-					true,
-					true,
-					timeout);
+				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = leaderGateway.requestJobDetails(timeout);
 
 				jobDetailsFuture.whenCompleteAsync(
 					(MultipleJobsDetails jobDetails, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index 688be79..b6ae0f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -31,7 +31,7 @@ public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequest
 
 	private static final ClusterOverviewHeaders INSTANCE = new ClusterOverviewHeaders();
 
-	public static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+	public static final String URL = "/overview";
 
 	// make this class a singleton
 	private ClusterOverviewHeaders() {}
@@ -48,7 +48,7 @@ public final class ClusterOverviewHeaders implements MessageHeaders<EmptyRequest
 
 	@Override
 	public String getTargetRestEndpointURL() {
-		return CLUSTER_OVERVIEW_REST_PATH;
+		return URL;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
deleted file mode 100644
index f97c601..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CurrentJobsOverviewHandlerHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for {@link CurrentJobsOverviewHandler}.
- */
-public final class CurrentJobsOverviewHandlerHeaders implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
-
-	private static final CurrentJobsOverviewHandlerHeaders INSTANCE = new CurrentJobsOverviewHandlerHeaders();
-
-	// make this class a singleton
-	private CurrentJobsOverviewHandlerHeaders() {}
-
-	@Override
-	public Class<EmptyRequestBody> getRequestClass() {
-		return EmptyRequestBody.class;
-	}
-
-	@Override
-	public HttpMethodWrapper getHttpMethod() {
-		return HttpMethodWrapper.GET;
-	}
-
-	@Override
-	public String getTargetRestEndpointURL() {
-		return "/joboverview";
-	}
-
-	@Override
-	public Class<MultipleJobsDetails> getResponseClass() {
-		return MultipleJobsDetails.class;
-	}
-
-	@Override
-	public HttpResponseStatus getResponseStatusCode() {
-		return HttpResponseStatus.OK;
-	}
-
-	@Override
-	public EmptyMessageParameters getUnresolvedMessageParameters() {
-		return EmptyMessageParameters.getInstance();
-	}
-
-	public static CurrentJobsOverviewHandlerHeaders getInstance() {
-		return INSTANCE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
new file mode 100644
index 0000000..5be841c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobsOverviewHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link JobsOverviewHandler}.
+ */
+public final class JobsOverviewHeaders implements MessageHeaders<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+	private static final JobsOverviewHeaders INSTANCE = new JobsOverviewHeaders();
+
+	public static final String URL = "/jobs/overview";
+
+	// make this class a singleton
+	private JobsOverviewHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<MultipleJobsDetails> getResponseClass() {
+		return MultipleJobsDetails.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static JobsOverviewHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 331e96b..d5de8e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -63,14 +63,10 @@ public interface RestfulGateway extends RpcGateway {
 	/**
 	 * Requests job details currently being executed on the Flink cluster.
 	 *
-	 * @param includeRunning true if running jobs shall be included, otherwise false
-	 * @param includeFinished true if finished jobs shall be included, otherwise false
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the job details
 	 */
 	CompletableFuture<MultipleJobsDetails> requestJobDetails(
-		boolean includeRunning,
-		boolean includeFinished,
 		@RpcTimeout Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
deleted file mode 100644
index c9f91d4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandlerTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-
-/**
- * Tests for the CurrentJobsOverviewHandler.
- */
-public class CurrentJobsOverviewHandlerTest {
-
-	@Test
-	public void testArchiver() throws Exception {
-		JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
-
-		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
-		Assert.assertEquals(1, archives.size());
-
-		ArchivedJson archive = archives.iterator().next();
-		Assert.assertEquals("/joboverview", archive.getPath());
-
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
-		ArrayNode running = (ArrayNode) result.get("running");
-		Assert.assertEquals(0, running.size());
-
-		ArrayNode finished = (ArrayNode) result.get("finished");
-		Assert.assertEquals(1, finished.size());
-
-		compareJobOverview(expectedDetails, finished.get(0).toString());
-	}
-
-	@Test
-	public void testGetPaths() {
-		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
-		String[] pathsAll = handlerAll.getPaths();
-		Assert.assertEquals(1, pathsAll.length);
-		Assert.assertEquals("/joboverview", pathsAll[0]);
-
-		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
-		String[] pathsRunning = handlerRunning.getPaths();
-		Assert.assertEquals(1, pathsRunning.length);
-		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
-
-		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
-		String[] pathsCompleted = handlerCompleted.getPaths();
-		Assert.assertEquals(1, pathsCompleted.length);
-		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
-	}
-
-	@Test
-	public void testJsonGeneration() throws Exception {
-		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
-		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
-		StringWriter writer = new StringWriter();
-		try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
-			JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer();
-			serializer.serialize(expectedDetails, gen, null);
-		}
-		compareJobOverview(expectedDetails, writer.toString());
-	}
-
-	private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
-		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
-
-		Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
-		Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
-		Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
-
-		Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
-		Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
-		Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
-		Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
-
-		JsonNode tasks = result.get("tasks");
-		Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
-		int[] tasksPerState = expectedDetails.getTasksPerState();
-
-		for (ExecutionState executionState : ExecutionState.values()) {
-			Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
new file mode 100644
index 0000000..ea40376
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+
+/**
+ * Tests for the CurrentJobsOverviewHandler.
+ */
+public class JobsOverviewHandlerTest extends TestLogger {
+
+	@Test
+	public void testArchiver() throws Exception {
+		JsonArchivist archivist = new JobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+
+		Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob);
+		Assert.assertEquals(1, archives.size());
+
+		ArchivedJson archive = archives.iterator().next();
+		Assert.assertEquals(JobsOverviewHeaders.URL, archive.getPath());
+
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
+		ArrayNode running = (ArrayNode) result.get("running");
+		Assert.assertEquals(0, running.size());
+
+		ArrayNode finished = (ArrayNode) result.get("finished");
+		Assert.assertEquals(1, finished.size());
+
+		compareJobOverview(expectedDetails, finished.get(0).toString());
+	}
+
+	@Test
+	public void testGetPaths() {
+		JobsOverviewHandler handlerAll = new JobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
+		String[] pathsAll = handlerAll.getPaths();
+		Assert.assertEquals(1, pathsAll.length);
+		Assert.assertEquals(JobsOverviewHeaders.URL, pathsAll[0]);
+	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+		StringWriter writer = new StringWriter();
+		try (JsonGenerator gen = ArchivedJobGenerationUtils.JACKSON_FACTORY.createGenerator(writer)) {
+			JobDetails.JobDetailsSerializer serializer = new JobDetails.JobDetailsSerializer();
+			serializer.serialize(expectedDetails, gen, null);
+		}
+		compareJobOverview(expectedDetails, writer.toString());
+	}
+
+	private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException {
+		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(answer);
+
+		Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
+		Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
+		Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
+
+		Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
+		Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
+
+		JsonNode tasks = result.get("tasks");
+		Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
+		int[] tasksPerState = expectedDetails.getTasksPerState();
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			Assert.assertEquals(tasksPerState[executionState.ordinal()], tasks.get(executionState.name().toLowerCase()).asInt());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8086e3be/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index faa97a7..e157b0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -82,7 +81,7 @@ public class MetricFetcherTest extends TestLogger {
 
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
-		when(jobManagerGateway.requestJobDetails(anyBoolean(), anyBoolean(), any(Time.class)))
+		when(jobManagerGateway.requestJobDetails(any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
 		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));