You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 17:04:03 UTC

[1/2] flink git commit: [FLINK-7652] [flip6] Introduce JobIdWithStatus type for JobIdsWithStatusOverview

Repository: flink
Updated Branches:
  refs/heads/master 731896ae7 -> 9fc3c71f8


[FLINK-7652] [flip6] Introduce JobIdWithStatus type for JobIdsWithStatusOverview

Instead of storing the JobID and the JobStatus in a Tuple2 which is serialized
as an array of values in JSON, this commit introduces the JobIdWithStatus which
is serialized as a proper JSON object with an id and a status field.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc3c71f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc3c71f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc3c71f

Branch: refs/heads/master
Commit: 9fc3c71f81a2728fd60c7ce26023c23fa2ced37a
Parents: 67aad88
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 16:00:46 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     |   6 +-
 .../runtime/dispatcher/DispatcherGateway.java   |   9 --
 .../dispatcher/DispatcherRestEndpoint.java      |  10 +-
 .../runtime/jobmaster/JobManagerGateway.java    |   4 +-
 .../webmonitor/JobIdsWithStatusOverview.java    | 161 +++++++++++++++++++
 .../webmonitor/JobIdsWithStatusesOverview.java  | 152 -----------------
 .../runtime/rest/handler/job/JobIdsHandler.java |  21 ++-
 .../handler/legacy/CurrentJobIdsHandler.java    |  16 +-
 .../JobIdsWithStatusesOverviewHeaders.java      |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |  10 +-
 .../messages/WebMonitorMessagesTest.java        |  17 +-
 .../JobIdsWithStatusOverviewTest.java           |  46 ++++++
 .../JobIdsWithStatusesOverviewTest.java         |  47 ------
 14 files changed, 262 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index f7f8898..0a465b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
+	public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
 				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
-				.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
+				.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 51e9112..12cbbfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -80,12 +79,4 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 * @return A future integer of the blob server port
 	 */
 	CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
-
-	/**
-	 * Request details of submitted jobs.
-	 *
-	 * @param timeout RPC timeout
-	 * @return A future of the details of submitted jobs.
-	 */
-	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4556bb8..44eb3da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -165,19 +165,19 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			DashboardConfigurationHeaders.getInstance(),
 			restConfiguration.getRefreshInterval());
 
-		JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
+		JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
-			JobsOverviewHeaders.getInstance());
+			JobIdsWithStatusesOverviewHeaders.getInstance());
 
-		JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+		JobsOverviewHandler<DispatcherGateway> jobsOverviewHandler = new JobsOverviewHandler<>(
 			restAddressFuture,
 			leaderRetriever,
 			timeout,
 			responseHeaders,
-			JobIdsWithStatusesOverviewHeaders.getInstance());
+			JobsOverviewHeaders.getInstance());
 
 		ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
 			restAddressFuture,
@@ -378,8 +378,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
 		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
-		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
 		handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
+		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 84f42fc..1695853 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the job overview
 	 */
-	CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
+	CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
new file mode 100644
index 0000000..63c23f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverview.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusOverview implements ResponseBody, InfoMessage {
+
+	private static final long serialVersionUID = -3699051943490133183L;
+
+	public static final String FIELD_NAME_JOBS = "jobs";
+
+	@JsonProperty(FIELD_NAME_JOBS)
+	private final Collection<JobIdWithStatus> jobsWithStatus;
+
+	@JsonCreator
+	public JobIdsWithStatusOverview(
+			@JsonProperty(FIELD_NAME_JOBS) Collection<JobIdWithStatus> jobsWithStatus) {
+		this.jobsWithStatus = checkNotNull(jobsWithStatus);
+	}
+
+	public JobIdsWithStatusOverview(JobIdsWithStatusOverview first, JobIdsWithStatusOverview second) {
+		this.jobsWithStatus = combine(first.getJobsWithStatus(), second.getJobsWithStatus());
+	}
+
+	public Collection<JobIdWithStatus> getJobsWithStatus() {
+		return jobsWithStatus;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return jobsWithStatus.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof JobIdsWithStatusOverview) {
+			JobIdsWithStatusOverview that = (JobIdsWithStatusOverview) obj;
+			return jobsWithStatus.equals(that.getJobsWithStatus());
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "JobIdsWithStatusesOverview { " + jobsWithStatus + " }";
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Collection<JobIdWithStatus> combine(
+			Collection<JobIdWithStatus> first,
+			Collection<JobIdWithStatus> second) {
+		checkNotNull(first);
+		checkNotNull(second);
+
+		ArrayList<JobIdWithStatus> result = new ArrayList<>(first.size() + second.size());
+
+		result.addAll(first);
+		result.addAll(second);
+
+		return result;
+	}
+
+	// -------------------------------------------------------------------------
+	// Static classes
+	// -------------------------------------------------------------------------
+
+	public static final class JobIdWithStatus implements Serializable {
+
+		private static final long serialVersionUID = -499449819268733026L;
+
+		public static final String FIELD_NAME_JOB_ID = "id";
+
+		public static final String FIELD_NAME_JOB_STATUS = "status";
+
+		@JsonProperty(FIELD_NAME_JOB_ID)
+		@JsonSerialize(using = JobIDSerializer.class)
+		private final JobID jobId;
+
+		@JsonProperty(FIELD_NAME_JOB_STATUS)
+		private final JobStatus jobStatus;
+
+		@JsonCreator
+		public JobIdWithStatus(
+			@JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = JobIDDeserializer.class) JobID jobId,
+			@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus) {
+			this.jobId = Preconditions.checkNotNull(jobId);
+			this.jobStatus = Preconditions.checkNotNull(jobStatus);
+		}
+
+		public JobID getJobId() {
+			return jobId;
+		}
+
+		public JobStatus getJobStatus() {
+			return jobStatus;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			JobIdWithStatus that = (JobIdWithStatus) o;
+			return Objects.equals(jobId, that.jobId) &&
+				jobStatus == that.jobStatus;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(jobId, jobStatus);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
deleted file mode 100644
index 59c1e2c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
-
-	private static final long serialVersionUID = -3699051943490133183L;
-
-	public static final String FIELD_NAME_JOB_IDS = "job-ids";
-
-	@JsonProperty(FIELD_NAME_JOB_IDS)
-	@JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
-	private final Collection<Tuple2<JobID, JobStatus>> jobIds;
-
-	@JsonCreator
-	public JobIdsWithStatusesOverview(
-			@JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
-		this.jobIds = checkNotNull(jobIds);
-	}
-
-	public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
-		this.jobIds = combine(first.getJobIds(), second.getJobIds());
-	}
-
-	public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
-		return jobIds;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return jobIds.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		else if (obj instanceof JobIdsWithStatusesOverview) {
-			JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
-			return jobIds.equals(that.getJobIds());
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JobIdsWithStatusesOverview { " + jobIds + " }";
-	}
-
-	public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
-
-		private static final long serialVersionUID = 2196011372021674535L;
-
-		public JobIdWithStatusSerializer() {
-			super(Tuple2.class, true);
-		}
-
-		@Override
-		public void serialize(
-				Tuple2<JobID, JobStatus> jobIdWithStatus,
-				JsonGenerator jsonGenerator,
-				SerializerProvider serializerProvider) throws IOException {
-
-			jsonGenerator.writeStartArray();
-
-			jsonGenerator.writeString(jobIdWithStatus.f0.toString());
-			jsonGenerator.writeString(jobIdWithStatus.f1.name());
-
-			jsonGenerator.writeEndArray();
-		}
-	}
-
-	public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
-
-		private static final long serialVersionUID = 5378670283978134794L;
-
-		public JobIdWithStatusDeserializer() {
-			super(Tuple2.class);
-		}
-
-		@Override
-		public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
-			Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
-				JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
-
-			// read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
-			jsonParser.nextValue();
-
-			return deserializedIdWithStatus;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static Collection<Tuple2<JobID, JobStatus>> combine(
-			Collection<Tuple2<JobID, JobStatus>> first,
-			Collection<Tuple2<JobID, JobStatus>> second) {
-
-		checkNotNull(first);
-		checkNotNull(second);
-		ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
-		result.addAll(first);
-		result.addAll(second);
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
index 9eea83c..0fdf18c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -19,8 +19,7 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -42,14 +41,14 @@ import java.util.stream.Collectors;
  * @param <T> type of the leader gateway
  */
 public class JobIdsHandler<T extends RestfulGateway>
-		extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+		extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
 
 	public JobIdsHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends T> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
-			MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+			MessageHeaders<EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> messageHeaders) {
 		super(
 			localRestAddress,
 			leaderRetriever,
@@ -59,13 +58,21 @@ public class JobIdsHandler<T extends RestfulGateway>
 	}
 
 	@Override
-	protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+	protected CompletableFuture<JobIdsWithStatusOverview> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
 			@Nonnull T gateway) throws RestHandlerException {
 
 		return gateway.requestJobDetails(timeout).thenApply(
-			multipleJobDetails -> new JobIdsWithStatusesOverview(
-				multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+			multipleJobDetails -> new JobIdsWithStatusOverview(
+				multipleJobDetails
+					.getJobs()
+					.stream()
+					.map(
+						jobDetails ->
+							new JobIdsWithStatusOverview.JobIdWithStatus(
+								jobDetails.getJobId(),
+								jobDetails.getStatus()))
+					.collect(Collectors.toList())
 			)
 		);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index 87f54fb..cf8a3d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.util.FlinkException;
 
@@ -68,18 +65,17 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 				// we need no parameters, get all requests
 				try {
 					if (jobManagerGateway != null) {
-						CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-						JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+						CompletableFuture<JobIdsWithStatusOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobIdsWithStatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 						StringWriter writer = new StringWriter();
 						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
 						gen.writeStartObject();
-						gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
+						gen.writeArrayFieldStart(JobIdsWithStatusOverview.FIELD_NAME_JOBS);
 
-						final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
-						for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
-							jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
+						for (JobIdsWithStatusOverview.JobIdWithStatus jobIdWithStatus : overview.getJobsWithStatus()) {
+							gen.writeObject(jobIdWithStatus);
 						}
 
 						gen.writeEndArray();

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
index 4dee21a..a9af911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
- * Message headers for the {@link JobIdsWithStatusesOverview}.
+ * Message headers for the {@link JobIdsWithStatusOverview}.
  */
-public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusOverview, EmptyMessageParameters> {
 
 	public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
 
@@ -50,8 +50,8 @@ public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRe
 	}
 
 	@Override
-	public Class<JobIdsWithStatusesOverview> getResponseClass() {
-		return JobIdsWithStatusesOverview.class;
+	public Class<JobIdsWithStatusOverview> getResponseClass() {
+		return JobIdsWithStatusOverview.class;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2aeaa6e..7799a78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
@@ -1627,8 +1628,8 @@ class JobManager(
           val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
 
           future.onSuccess {
-            case archiveOverview: JobIdsWithStatusesOverview =>
-              theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
+            case archiveOverview: JobIdsWithStatusOverview =>
+              theSender ! new JobIdsWithStatusOverview(ourJobs, archiveOverview)
           }(context.dispatcher)
 
         case _ : RequestStatusOverview =>
@@ -1694,17 +1695,16 @@ class JobManager(
     new JobsOverview(runningOrPending, finished, canceled, failed)
   }
 
-  private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+  private def createJobStatusWithIDsOverview() : JobIdsWithStatusOverview = {
     val jobIdsWithStatuses =
-      new java.util.ArrayList[
-        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+      new java.util.ArrayList[JobIdWithStatus](currentJobs.size)
 
     currentJobs.values.foreach { job =>
       jobIdsWithStatuses.add(
-        org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
+        new JobIdWithStatus(job._1.getJobID, job._1.getState))
     }
 
-    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
+    new JobIdsWithStatusOverview(jobIdsWithStatuses)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2efdfb2..de42ab2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus
 
 import scala.collection.mutable
 import scala.concurrent.future
@@ -222,17 +223,16 @@ class MemoryArchivist(
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
 
-  private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+  private def createJobsWithIDsOverview() : JobIdsWithStatusOverview = {
     val jobIdsWithStatuses =
-      new java.util.ArrayList[
-        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
+      new java.util.ArrayList[JobIdWithStatus](graphs.size)
 
     graphs.values.foreach { graph =>
       jobIdsWithStatuses.add(
-        org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
+        new JobIdWithStatus(graph.getJobID, graph.getState))
     }
 
-    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
+    new JobIdsWithStatusOverview(jobIdsWithStatuses)
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 0221929..c56f4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,18 +19,17 @@
 package org.apache.flink.runtime.messages;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 
 import org.junit.Test;
 
@@ -58,12 +57,12 @@ public class WebMonitorMessagesTest {
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
 			
-			GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
-				Tuple2.of(JobID.generate(), JobStatus.RUNNING),
-				Tuple2.of(JobID.generate(), JobStatus.CANCELED),
-				Tuple2.of(JobID.generate(), JobStatus.CREATED),
-				Tuple2.of(JobID.generate(), JobStatus.FAILED),
-				Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
+			GenericMessageTester.testMessageInstance(new JobIdsWithStatusOverview(Arrays.asList(
+				new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RUNNING),
+				new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CANCELED),
+				new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CREATED),
+				new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.FAILED),
+				new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RESTARTING))));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
new file mode 100644
index 0000000..faf9cc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusOverviewTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusOverview} message.
+ */
+public class JobIdsWithStatusOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusOverview> {
+
+	@Override
+	protected Class<JobIdsWithStatusOverview> getTestResponseClass() {
+		return JobIdsWithStatusOverview.class;
+	}
+
+	@Override
+	protected JobIdsWithStatusOverview getTestResponseInstance() {
+		return new JobIdsWithStatusOverview(Arrays.asList(
+			new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RUNNING),
+			new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CANCELED),
+			new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.CREATED),
+			new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.FAILED),
+			new JobIdsWithStatusOverview.JobIdWithStatus(JobID.generate(), JobStatus.RESTARTING)));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fc3c71f/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
deleted file mode 100644
index 21a20c9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
-
-import java.util.Arrays;
-
-/**
- * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
- */
-public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
-
-	@Override
-	protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
-		return JobIdsWithStatusesOverview.class;
-	}
-
-	@Override
-	protected JobIdsWithStatusesOverview getTestResponseInstance() {
-		return new JobIdsWithStatusesOverview(Arrays.asList(
-			Tuple2.of(JobID.generate(), JobStatus.RUNNING),
-			Tuple2.of(JobID.generate(), JobStatus.CANCELED),
-			Tuple2.of(JobID.generate(), JobStatus.CREATED),
-			Tuple2.of(JobID.generate(), JobStatus.FAILED),
-			Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
-	}
-}


[2/2] flink git commit: [FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

Posted by tr...@apache.org.
[FLINK-7652] [flip6] Port CurrentJobIdsHandler to new REST endpoint

This closes #4734.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67aad88e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67aad88e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67aad88e

Branch: refs/heads/master
Commit: 67aad88ee025ce02053ab560f2504762f53b87d9
Parents: 731896a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Sep 27 13:02:32 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 18:03:41 2017 +0100

----------------------------------------------------------------------
 .../runtime/akka/AkkaJobManagerGateway.java     |   6 +-
 .../runtime/dispatcher/DispatcherGateway.java   |   9 ++
 .../dispatcher/DispatcherRestEndpoint.java      |  10 ++
 .../runtime/jobmaster/JobManagerGateway.java    |   4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   3 +-
 .../webmonitor/JobIdsWithStatusesOverview.java  | 152 +++++++++++++++++++
 .../webmonitor/JobsWithIDsOverview.java         | 120 ---------------
 .../runtime/rest/handler/job/JobIdsHandler.java |  72 +++++++++
 .../handler/legacy/CurrentJobIdsHandler.java    |  45 +++---
 .../JobIdsWithStatusesOverviewHeaders.java      |  70 +++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  27 ++--
 .../runtime/jobmanager/MemoryArchivist.scala    |  19 +--
 .../messages/WebMonitorMessagesTest.java        |  11 +-
 .../JobIdsWithStatusesOverviewTest.java         |  47 ++++++
 14 files changed, 410 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 57b5dec..f7f8898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
@@ -283,11 +283,11 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout) {
+	public CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
 				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
-				.mapTo(ClassTag$.MODULE$.apply(JobsWithIDsOverview.class)));
+				.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusesOverview.class)));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 12cbbfb..51e9112 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -79,4 +80,12 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 * @return A future integer of the blob server port
 	 */
 	CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
+
+	/**
+	 * Request details of submitted jobs.
+	 *
+	 * @param timeout RPC timeout
+	 * @return A future of the details of submitted jobs.
+	 */
+	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index a99151a..4556bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
@@ -170,6 +172,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			JobsOverviewHeaders.getInstance());
 
+		JobIdsHandler<DispatcherGateway> jobIdsHandler = new JobIdsHandler<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobIdsWithStatusesOverviewHeaders.getInstance());
+
 		ClusterConfigHandler<DispatcherGateway> clusterConfigurationHandler = new ClusterConfigHandler<>(
 			restAddressFuture,
 			leaderRetriever,
@@ -370,6 +379,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(ClusterConfigurationInfoHeaders.getInstance(), clusterConfigurationHandler));
 		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigHandler));
 		handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
+		handlers.add(Tuple2.of(JobIdsWithStatusesOverviewHeaders.getInstance(), jobIdsHandler));
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 2527e46..84f42fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import javax.annotation.Nullable;
@@ -127,5 +127,5 @@ public interface JobManagerGateway extends RestfulGateway {
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the job overview
 	 */
-	CompletableFuture<JobsWithIDsOverview> requestJobsOverview(Time timeout);
+	CompletableFuture<JobIdsWithStatusesOverview> requestJobsOverview(Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4535290..e2fb65f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -86,6 +86,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -742,7 +743,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
new file mode 100644
index 0000000..59c1e2c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverview.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An overview of how many jobs are in which status.
+ */
+public class JobIdsWithStatusesOverview implements ResponseBody, InfoMessage {
+
+	private static final long serialVersionUID = -3699051943490133183L;
+
+	public static final String FIELD_NAME_JOB_IDS = "job-ids";
+
+	@JsonProperty(FIELD_NAME_JOB_IDS)
+	@JsonSerialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusSerializer.class)
+	private final Collection<Tuple2<JobID, JobStatus>> jobIds;
+
+	@JsonCreator
+	public JobIdsWithStatusesOverview(
+			@JsonProperty(FIELD_NAME_JOB_IDS) @JsonDeserialize(contentUsing = JobIdsWithStatusesOverview.JobIdWithStatusDeserializer.class) Collection<Tuple2<JobID, JobStatus>> jobIds) {
+		this.jobIds = checkNotNull(jobIds);
+	}
+
+	public JobIdsWithStatusesOverview(JobIdsWithStatusesOverview first, JobIdsWithStatusesOverview second) {
+		this.jobIds = combine(first.getJobIds(), second.getJobIds());
+	}
+
+	public Collection<Tuple2<JobID, JobStatus>> getJobIds() {
+		return jobIds;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return jobIds.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof JobIdsWithStatusesOverview) {
+			JobIdsWithStatusesOverview that = (JobIdsWithStatusesOverview) obj;
+			return jobIds.equals(that.getJobIds());
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "JobIdsWithStatusesOverview { " + jobIds + " }";
+	}
+
+	public static final class JobIdWithStatusSerializer extends StdSerializer<Tuple2<JobID, JobStatus>> {
+
+		private static final long serialVersionUID = 2196011372021674535L;
+
+		public JobIdWithStatusSerializer() {
+			super(Tuple2.class, true);
+		}
+
+		@Override
+		public void serialize(
+				Tuple2<JobID, JobStatus> jobIdWithStatus,
+				JsonGenerator jsonGenerator,
+				SerializerProvider serializerProvider) throws IOException {
+
+			jsonGenerator.writeStartArray();
+
+			jsonGenerator.writeString(jobIdWithStatus.f0.toString());
+			jsonGenerator.writeString(jobIdWithStatus.f1.name());
+
+			jsonGenerator.writeEndArray();
+		}
+	}
+
+	public static final class JobIdWithStatusDeserializer extends StdDeserializer<Tuple2<JobID, JobStatus>> {
+
+		private static final long serialVersionUID = 5378670283978134794L;
+
+		public JobIdWithStatusDeserializer() {
+			super(Tuple2.class);
+		}
+
+		@Override
+		public Tuple2<JobID, JobStatus> deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
+			Tuple2<JobID, JobStatus> deserializedIdWithStatus = Tuple2.of(
+				JobID.fromHexString(jsonParser.nextTextValue()), JobStatus.valueOf(jsonParser.nextTextValue()));
+
+			// read the END_ARRAY token, otherwise it will be mistaken as the end of the whole collection of ids and statuses
+			jsonParser.nextValue();
+
+			return deserializedIdWithStatus;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Collection<Tuple2<JobID, JobStatus>> combine(
+			Collection<Tuple2<JobID, JobStatus>> first,
+			Collection<Tuple2<JobID, JobStatus>> second) {
+
+		checkNotNull(first);
+		checkNotNull(second);
+		ArrayList<Tuple2<JobID, JobStatus>> result = new ArrayList<>(first.size() + second.size());
+		result.addAll(first);
+		result.addAll(second);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
deleted file mode 100644
index 4cb9d8b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobsWithIDsOverview.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An overview of how many jobs are in which status.
- */
-public class JobsWithIDsOverview implements InfoMessage {
-
-	private static final long serialVersionUID = -3699051943490133183L;
-	
-	private final List<JobID> jobsRunningOrPending;
-	private final List<JobID> jobsFinished;
-	private final List<JobID> jobsCancelled;
-	private final List<JobID> jobsFailed;
-
-	public JobsWithIDsOverview(List<JobID> jobsRunningOrPending, List<JobID> jobsFinished, 
-								List<JobID> jobsCancelled, List<JobID> jobsFailed) {
-		
-		this.jobsRunningOrPending = checkNotNull(jobsRunningOrPending);
-		this.jobsFinished = checkNotNull(jobsFinished);
-		this.jobsCancelled = checkNotNull(jobsCancelled);
-		this.jobsFailed = checkNotNull(jobsFailed);
-	}
-
-	public JobsWithIDsOverview(JobsWithIDsOverview first, JobsWithIDsOverview second) {
-		this.jobsRunningOrPending = combine(first.getJobsRunningOrPending(), second.getJobsRunningOrPending());
-		this.jobsFinished = combine(first.getJobsFinished(), second.getJobsFinished());
-		this.jobsCancelled = combine(first.getJobsCancelled(), second.getJobsCancelled());
-		this.jobsFailed = combine(first.getJobsFailed(), second.getJobsFailed());
-	}
-
-	public List<JobID> getJobsRunningOrPending() {
-		return jobsRunningOrPending;
-	}
-
-	public List<JobID> getJobsFinished() {
-		return jobsFinished;
-	}
-
-	public List<JobID> getJobsCancelled() {
-		return jobsCancelled;
-	}
-
-	public List<JobID> getJobsFailed() {
-		return jobsFailed;
-	}
-	
-	// ------------------------------------------------------------------------
-
-
-	@Override
-	public int hashCode() {
-		return jobsRunningOrPending.hashCode() ^
-				jobsFinished.hashCode() ^
-				jobsCancelled.hashCode() ^
-				jobsFailed.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		else if (obj instanceof JobsWithIDsOverview) {
-			JobsWithIDsOverview that = (JobsWithIDsOverview) obj;
-			return this.jobsRunningOrPending.equals(that.jobsRunningOrPending) &&
-					this.jobsFinished.equals(that.jobsFinished) &&
-					this.jobsCancelled.equals(that.jobsCancelled) &&
-					this.jobsFailed.equals(that.jobsFailed);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JobsOverview {" +
-				"numJobsRunningOrPending=" + jobsRunningOrPending +
-				", numJobsFinished=" + jobsFinished +
-				", numJobsCancelled=" + jobsCancelled +
-				", numJobsFailed=" + jobsFailed +
-				'}';
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static ArrayList<JobID> combine(List<JobID> first, List<JobID> second) {
-		checkNotNull(first);
-		checkNotNull(second);
-		ArrayList<JobID> result = new ArrayList<JobID>(first.size() + second.size());
-		result.addAll(first);
-		result.addAll(second);
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
new file mode 100644
index 0000000..9eea83c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobIdsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for job IDs.
+ *
+ * @param <T> type of the leader gateway
+ */
+public class JobIdsHandler<T extends RestfulGateway>
+		extends AbstractRestHandler<T, EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+	public JobIdsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends T> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> messageHeaders) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<JobIdsWithStatusesOverview> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull T gateway) throws RestHandlerException {
+
+		return gateway.requestJobDetails(timeout).thenApply(
+			multipleJobDetails -> new JobIdsWithStatusesOverview(
+				multipleJobDetails.getJobs().stream().map(jobDetails -> Tuple2.of(jobDetails.getJobId(), jobDetails.getStatus())).collect(Collectors.toList())
+			)
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index cbeb957..87f54fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -42,8 +45,6 @@ import static java.util.Objects.requireNonNull;
  */
 public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
-	private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
 	private final Time timeout;
 
 	public CurrentJobIdsHandler(Executor executor, Time timeout) {
@@ -53,51 +54,39 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	@Override
 	public String[] getPaths() {
-		return new String[]{CURRENT_JOB_IDS_REST_PATH};
+		return new String[]{JobIdsWithStatusesOverviewHeaders.CURRENT_JOB_IDS_REST_PATH};
 	}
 
 	@Override
-	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
+
 		return CompletableFuture.supplyAsync(
 			() -> {
 				// we need no parameters, get all requests
 				try {
 					if (jobManagerGateway != null) {
-						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+						CompletableFuture<JobIdsWithStatusesOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobIdsWithStatusesOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 						StringWriter writer = new StringWriter();
 						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
 						gen.writeStartObject();
+						gen.writeArrayFieldStart(JobIdsWithStatusesOverview.FIELD_NAME_JOB_IDS);
 
-						gen.writeArrayFieldStart("jobs-running");
-						for (JobID jid : overview.getJobsRunningOrPending()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-finished");
-						for (JobID jid : overview.getJobsFinished()) {
-							gen.writeString(jid.toString());
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart("jobs-cancelled");
-						for (JobID jid : overview.getJobsCancelled()) {
-							gen.writeString(jid.toString());
+						final JobIdsWithStatusesOverview.JobIdWithStatusSerializer jobIdWithStatusSerializer = new JobIdsWithStatusesOverview.JobIdWithStatusSerializer();
+						for (Tuple2<JobID, JobStatus> jobIdWithStatus : overview.getJobIds()) {
+							jobIdWithStatusSerializer.serialize(jobIdWithStatus, gen, null);
 						}
-						gen.writeEndArray();
 
-						gen.writeArrayFieldStart("jobs-failed");
-						for (JobID jid : overview.getJobsFailed()) {
-							gen.writeString(jid.toString());
-						}
 						gen.writeEndArray();
-
 						gen.writeEndObject();
 
 						gen.close();
+
 						return writer.toString();
 					}
 					else {

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
new file mode 100644
index 0000000..4dee21a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobIdsWithStatusesOverviewHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobIdsWithStatusesOverview}.
+ */
+public class JobIdsWithStatusesOverviewHeaders implements MessageHeaders<EmptyRequestBody, JobIdsWithStatusesOverview, EmptyMessageParameters> {
+
+	public static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+	private static final JobIdsWithStatusesOverviewHeaders INSTANCE = new JobIdsWithStatusesOverviewHeaders();
+
+	private JobIdsWithStatusesOverviewHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return CURRENT_JOB_IDS_REST_PATH;
+	}
+
+	@Override
+	public Class<JobIdsWithStatusesOverview> getResponseClass() {
+		return JobIdsWithStatusesOverview.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static JobIdsWithStatusesOverviewHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c12db23..2aeaa6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1627,8 +1627,8 @@ class JobManager(
           val future = (archive ? RequestJobsWithIDsOverview.getInstance())(timeout)
 
           future.onSuccess {
-            case archiveOverview: JobsWithIDsOverview =>
-              theSender ! new JobsWithIDsOverview(ourJobs, archiveOverview)
+            case archiveOverview: JobIdsWithStatusesOverview =>
+              theSender ! new JobIdsWithStatusesOverview(ourJobs, archiveOverview)
           }(context.dispatcher)
 
         case _ : RequestStatusOverview =>
@@ -1694,22 +1694,17 @@ class JobManager(
     new JobsOverview(runningOrPending, finished, canceled, failed)
   }
 
-  private def createJobStatusWithIDsOverview() : JobsWithIDsOverview = {
-    val runningOrPending = new java.util.ArrayList[JobID]()
-    val finished = new java.util.ArrayList[JobID]()
-    val canceled = new java.util.ArrayList[JobID]()
-    val failed = new java.util.ArrayList[JobID]()
-    
-    currentJobs.values.foreach { case (graph, _) =>
-      graph.getState() match {
-        case JobStatus.FINISHED => finished.add(graph.getJobID)
-        case JobStatus.CANCELED => canceled.add(graph.getJobID)
-        case JobStatus.FAILED => failed.add(graph.getJobID)
-        case _ => runningOrPending.add(graph.getJobID)
-      }
+  private def createJobStatusWithIDsOverview() : JobIdsWithStatusesOverview = {
+    val jobIdsWithStatuses =
+      new java.util.ArrayList[
+        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](currentJobs.size)
+
+    currentJobs.values.foreach { job =>
+      jobIdsWithStatuses.add(
+        org.apache.flink.api.java.tuple.Tuple2.of(job._1.getJobID, job._1.getState))
     }
 
-    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 3885596..2efdfb2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -222,22 +222,17 @@ class MemoryArchivist(
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
 
-  private def createJobsWithIDsOverview() : JobsWithIDsOverview = {
-    val runningOrPending = new util.ArrayList[JobID]()
-    val finished = new util.ArrayList[JobID]()
-    val canceled = new util.ArrayList[JobID]()
-    val failed = new util.ArrayList[JobID]()
+  private def createJobsWithIDsOverview() : JobIdsWithStatusesOverview = {
+    val jobIdsWithStatuses =
+      new java.util.ArrayList[
+        org.apache.flink.api.java.tuple.Tuple2[JobID, JobStatus]](graphs.size)
 
     graphs.values.foreach { graph =>
-      graph.getState() match {
-        case JobStatus.FINISHED => finished.add(graph.getJobID)
-        case JobStatus.CANCELED => canceled.add(graph.getJobID)
-        case JobStatus.FAILED => failed.add(graph.getJobID)
-        case _ => runningOrPending.add(graph.getJobID)
-      }
+      jobIdsWithStatuses.add(
+        org.apache.flink.api.java.tuple.Tuple2.of(graph.getJobID, graph.getState))
     }
 
-    new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
+    new JobIdsWithStatusesOverview(jobIdsWithStatuses)
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index 673fa49..0221929 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.messages;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusesOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
@@ -57,8 +58,12 @@ public class WebMonitorMessagesTest {
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(ClusterOverview.class, rnd));
 			GenericMessageTester.testMessageInstance(GenericMessageTester.instantiateGeneric(JobsOverview.class, rnd));
 			
-			GenericMessageTester.testMessageInstance(new JobsWithIDsOverview(
-					randomIds(rnd), randomIds(rnd), randomIds(rnd), randomIds(rnd)));
+			GenericMessageTester.testMessageInstance(new JobIdsWithStatusesOverview(Arrays.asList(
+				Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+				Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+				Tuple2.of(JobID.generate(), JobStatus.CREATED),
+				Tuple2.of(JobID.generate(), JobStatus.FAILED),
+				Tuple2.of(JobID.generate(), JobStatus.RESTARTING))));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/67aad88e/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
new file mode 100644
index 0000000..21a20c9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobIdsWithStatusesOverviewTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+
+/**
+ * Marshalling test for the {@link JobIdsWithStatusesOverview} message.
+ */
+public class JobIdsWithStatusesOverviewTest extends RestResponseMarshallingTestBase<JobIdsWithStatusesOverview> {
+
+	@Override
+	protected Class<JobIdsWithStatusesOverview> getTestResponseClass() {
+		return JobIdsWithStatusesOverview.class;
+	}
+
+	@Override
+	protected JobIdsWithStatusesOverview getTestResponseInstance() {
+		return new JobIdsWithStatusesOverview(Arrays.asList(
+			Tuple2.of(JobID.generate(), JobStatus.RUNNING),
+			Tuple2.of(JobID.generate(), JobStatus.CANCELED),
+			Tuple2.of(JobID.generate(), JobStatus.CREATED),
+			Tuple2.of(JobID.generate(), JobStatus.FAILED),
+			Tuple2.of(JobID.generate(), JobStatus.RESTARTING)));
+	}
+}