You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 17:04:04 UTC

[2/2] flink git commit: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

[FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

This closes #4734.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67aad88e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67aad88e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67aad88e

Branch: refs/heads/master
Commit: 67aad88ee025ce02053ab560f2504762f53b87d9
Parents: 731896a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 27 13:02:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     |   6 +-
 .../runtime/dispatcher/DispatcherGateway.java   |   9 ++
 .../dispatcher/DispatcherRestEndpoint.java      |  10 ++
 .../runtime/jobmaster/JobManagerGateway.java    |   4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   3 +-
 .../webmonitor/JobIdsWithStatusesOverview.java  | 152 +++++++++++++++++++
 .../webmonitor/JobsWithIDsOverview.java         | 120 ---------------
 .../runtime/rest/handler/job/JobIdsHandler.java |  72 +++++++++
 .../handler/legacy/CurrentJobIdsHandler.java    |  45 +++---
 .../JobIdsWithStatusesOverviewHeaders.java      |  70 +++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  27 ++--
 .../runtime/jobmanager/MemoryArchivist.scala    |  19 +--
 .../messages/WebMonitorMessagesTest.java        |  11 +-
 .../JobIdsWithStatusesOverviewTest.java         |  47 ++++++
 14 files changed, 410 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 57b5dec..f7f8898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+	public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
 				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
-				.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+				.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 12cbbfb..51e9112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -79,4 +80,12 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 * @return A future integer of the blob server port
 	 */
 	CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
+
+	/**
+	 * Request details of submitted jobs.
+	 *
+	 * @param timeout RPC timeout
+	 * @return A future of the details of submitted jobs.
+	 */
+	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index a99151a..4556bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
@@ -170,6 +172,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			JobsOverviewHeaders.getInstance());
 
+		JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobIdsWithStatusesOverviewHeaders.getInstance());
+
 		ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
 			restAddressFuture,
 			leaderRetriever,
@@ -370,6 +379,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
 		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
+		handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 2527e46..84f42fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the job overview
 	 */
-	CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
+	CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4535290..e2fb65f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -742,7 +743,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
new file mode 100644
index 0000000..59c1e2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
+
+	private static final long serialVersionUID = -3699051943490133183L;
+
+	public static final String FIELD_NAME_JOB_IDS = "job-ids";
+
+	@JsonProperty(FIELD_NAME_JOB_IDS)
+	@JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
+	private final Collection<Tuple2<JobID, JobStatus>> jobIds;
+
+	@JsonCreator
+	public JobIdsWithStatusesOverview(
+			@JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
+		this.jobIds = checkNotNull(jobIds);
+	}
+
+	public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
+		this.jobIds = combine(first.getJobIds(), second.getJobIds());
+	}
+
+	public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
+		return jobIds;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return jobIds.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof JobIdsWithStatusesOverview) {
+			JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
+			return jobIds.equals(that.getJobIds());
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "JobIdsWithStatusesOverview { " + jobIds + " }";
+	}
+
+	public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
+
+		private static final long serialVersionUID = 2196011372021674535L;
+
+		public JobIdWithStatusSerializer() {
+			super(Tuple2.class, true);
+		}
+
+		@Override
+		public void serialize(
+				Tuple2<JobID, JobStatus> jobIdWithStatus,
+				JsonGenerator jsonGenerator,
+				SerializerProvider serializerProvider) throws IOException {
+
+			jsonGenerator.writeStartArray();
+
+			jsonGenerator.writeString(jobIdWithStatus.f0.toString());
+			jsonGenerator.writeString(jobIdWithStatus.f1.name());
+
+			jsonGenerator.writeEndArray();
+		}
+	}
+
+	public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
+
+		private static final long serialVersionUID = 5378670283978134794L;
+
+		public JobIdWithStatusDeserializer() {
+			super(Tuple2.class);
+		}
+
+		@Override
+		public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+			Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
+				JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
+
+			// read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
+			jsonParser.nextValue();
+
+			return deserializedIdWithStatus;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Collection<Tuple2<JobID, JobStatus>> combine(
+			Collection<Tuple2<JobID, JobStatus>> first,
+			Collection<Tuple2<JobID, JobStatus>> second) {
+
+		checkNotNull(first);
+		checkNotNull(second);
+		ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
+		result.addAll(first);
+		result.addAll(second);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
deleted file mode 100644
index 4cb9d8b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobsWithIDsOverview implements InfoMessage {
-
-	private static final long serialVersionUID = -3699051943490133183L;
-	
-	private final List<JobID> jobsRunningOrPending;
-	private final List<JobID> jobsFinished;
-	private final List<JobID> jobsCancelled;
-	private final List<JobID> jobsFailed;
-
-	public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished, 
-								List<JobID> jobsCancelled, List<JobID> jobsFailed) {
-		
-		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
-		this.jobsFinished = checkNotNull(jobsFinished);
-		this.jobsCancelled = checkNotNull(jobsCancelled);
-		this.jobsFailed = checkNotNull(jobsFailed);
-	}
-
-	public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
-		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
-		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
-		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
-		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
-	}
-
-	public List<JobID> getJobsRunningOrPending() {
-		return jobsRunningOrPending;
-	}
-
-	public List<JobID> getJobsFinished() {
-		return jobsFinished;
-	}
-
-	public List<JobID> getJobsCancelled() {
-		return jobsCancelled;
-	}
-
-	public List<JobID> getJobsFailed() {
-		return jobsFailed;
-	}
-	
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	public int hashCode() {
-		return jobsRunningOrPending.hashCode() ^
-				jobsFinished.hashCode() ^
-				jobsCancelled.hashCode() ^
-				jobsFailed.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		else if (obj instanceof JobsWithIDsOverview) {
-			JobsWithIDsOverview that = (JobsWithIDsOverview) obj;
-			return this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
-					this.jobsFinished.equals(that.jobsFinished) &&
-					this.jobsCancelled.equals(that.jobsCancelled) &&
-					this.jobsFailed.equals(that.jobsFailed);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JobsOverview {" +
-				"numJobsRunningOrPending=" + jobsRunningOrPending +
-				", numJobsFinished=" + jobsFinished +
-				", numJobsCancelled=" + jobsCancelled +
-				", numJobsFailed=" + jobsFailed +
-				'}';
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
-		checkNotNull(first);
-		checkNotNull(second);
-		ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
-		result.addAll(first);
-		result.addAll(second);
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
new file mode 100644
index 0000000..9eea83c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for job IDs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobIdsHandler<T extends RestfulGateway>
+		extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+	public JobIdsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull T gateway) throws RestHandlerException {
+
+		return gateway.requestJobDetails(timeout).thenApply(
+			multipleJobDetails -> new JobIdsWithStatusesOverview(
+				multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+			)
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index cbeb957..87f54fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -42,8 +45,6 @@ import static java.util.Objects.requireNonNull;
  */
 public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
-	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
 	private final Time timeout;
 
 	public CurrentJobIdsHandler(Executor executor, Time timeout) {
@@ -53,51 +54,39 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	@Override
 	public String[] getPaths() {
-		return new String[]{CURRENT_JOB_IDS_REST_PATH};
+		return new String[]{JobIdsWithStatusesOverviewHeaders.CURRENT_JOB_IDS_REST_PATH};
 	}
 
 	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
+
 		return CompletableFuture.supplyAsync(
 			() -> {
 				// we need no parameters, get all requests
 				try {
 					if (jobManagerGateway != null) {
-						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+						CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 						StringWriter writer = new StringWriter();
 						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
 						gen.writeStartObject();
+						gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
 
-						gen.writeArrayFieldStart("jobs-running");
-						for (JobID jid : overview.getJobsRunningOrPending()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-finished");
-						for (JobID jid : overview.getJobsFinished()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-cancelled");
-						for (JobID jid : overview.getJobsCancelled()) {
-							gen.writeString(jid.toString());
+						final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
+						for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
+							jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
 						}
-						gen.writeEndArray();
 
-						gen.writeArrayFieldStart("jobs-failed");
-						for (JobID jid : overview.getJobsFailed()) {
-							gen.writeString(jid.toString());
-						}
 						gen.writeEndArray();
-
 						gen.writeEndObject();
 
 						gen.close();
+
 						return writer.toString();
 					}
 					else {

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
new file mode 100644
index 0000000..4dee21a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobIdsWithStatusesOverview}.
+ */
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+	public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+	private static final JobIdsWithStatusesOverviewHeaders INSTANCE = new JobIdsWithStatusesOverviewHeaders();
+
+	private JobIdsWithStatusesOverviewHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return CURRENT_JOB_IDS_REST_PATH;
+	}
+
+	@Override
+	public Class<JobIdsWithStatusesOverview> getResponseClass() {
+		return JobIdsWithStatusesOverview.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static JobIdsWithStatusesOverviewHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c12db23..2aeaa6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1627,8 +1627,8 @@ class JobManager(
           val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
 
           future.onSuccess {
-            case archiveOverview: JobsWithIDsOverview =>
-              theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+            case archiveOverview: JobIdsWithStatusesOverview =>
+              theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
           }(context.dispatcher)
 
         case _ : RequestStatusOverview =>
@@ -1694,22 +1694,17 @@ class JobManager(
     new JobsOverview(runningOrPending, finished, canceled, failed)
   }
 
-  private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
-    val runningOrPending = new java.util.ArrayList[JobID]()
-    val finished = new java.util.ArrayList[JobID]()
-    val canceled = new java.util.ArrayList[JobID]()
-    val failed = new java.util.ArrayList[JobID]()
-    
-    currentJobs.values.foreach { case (graph, _) =>
-      graph.getState() match {
-        case JobStatus.FINISHED => finished.add(graph.getJobID)
-        case JobStatus.CANCELED => canceled.add(graph.getJobID)
-        case JobStatus.FAILED => failed.add(graph.getJobID)
-        case _ => runningOrPending.add(graph.getJobID)
-      }
+  private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+    val jobIdsWithStatuses =
+      new java.util.ArrayList[
+        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+
+    currentJobs.values.foreach { job =>
+      jobIdsWithStatuses.add(
+        org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
     }
 
-    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 3885596..2efdfb2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -222,22 +222,17 @@ class MemoryArchivist(
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
 
-  private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
-    val runningOrPending = new util.ArrayList[JobID]()
-    val finished = new util.ArrayList[JobID]()
-    val canceled = new util.ArrayList[JobID]()
-    val failed = new util.ArrayList[JobID]()
+  private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+    val jobIdsWithStatuses =
+      new java.util.ArrayList[
+        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
 
     graphs.values.foreach { graph =>
-      graph.getState() match {
-        case JobStatus.FINISHED => finished.add(graph.getJobID)
-        case JobStatus.CANCELED => canceled.add(graph.getJobID)
-        case JobStatus.FAILED => failed.add(graph.getJobID)
-        case _ => runningOrPending.add(graph.getJobID)
-      }
+      jobIdsWithStatuses.add(
+        org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
     }
 
-    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 673fa49..0221929 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.messages;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
@@ -57,8 +58,12 @@ public class WebMonitorMessagesTest {
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
 			
-			GenericMessageTester.testMessageInstance(new JobsWithIDsOverview(
-					randomIds(rnd), randomIds(rnd), randomIds(rnd), randomIds(rnd)));
+			GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
+				Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+				Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+				Tuple2.of(JobID.generate(), JobStatus.CREATED),
+				Tuple2.of(JobID.generate(), JobStatus.FAILED),
+				Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
new file mode 100644
index 0000000..21a20c9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
+ */
+public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
+
+	@Override
+	protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
+		return JobIdsWithStatusesOverview.class;
+	}
+
+	@Override
+	protected JobIdsWithStatusesOverview getTestResponseInstance() {
+		return new JobIdsWithStatusesOverview(Arrays.asList(
+			Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+			Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+			Tuple2.of(JobID.generate(), JobStatus.CREATED),
+			Tuple2.of(JobID.generate(), JobStatus.FAILED),
+			Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
+	}
+}