You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:15 UTC

[17/30] flink git commit: [FLINK-7705] Add JobDetailsHandler

[FLINK-7705] Add JobDetailsHandler

Add JobID(De)Serializer and JobVertexID(De)Serializer for jackson

This closes #4884.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de201a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de201a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de201a6c

Branch: refs/heads/master
Commit: de201a6c0f8739c12918817d2ef571abb2898f37
Parents: 1c78dee
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 09:59:17 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:43 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  14 +
 .../rest/handler/job/JobDetailsHandler.java     | 209 ++++++++++
 .../rest/handler/util/MutableIOMetrics.java     |  20 +
 .../checkpoints/CheckpointStatistics.java       |  12 +-
 .../rest/messages/job/JobDetailsHeaders.java    |  74 ++++
 .../rest/messages/job/JobDetailsInfo.java       | 392 +++++++++++++++++++
 .../rest/messages/json/JobIDDeserializer.java   |  43 ++
 .../rest/messages/json/JobIDSerializer.java     |  44 +++
 .../messages/json/JobVertexIDDeserializer.java  |  15 +-
 .../json/JobVertexIDKeyDeserializer.java        |  37 ++
 .../messages/json/JobVertexIDKeySerializer.java |  44 +++
 .../messages/json/JobVertexIDSerializer.java    |   4 +-
 .../rest/messages/job/JobDetailsInfoTest.java   | 107 +++++
 13 files changed, 1003 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 12187e3..e13fd5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
@@ -67,6 +68,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -289,6 +291,17 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			TaskManagerDetailsHeaders.getInstance(),
 			resourceManagerRetriever,
 			metricFetcher);
+
+		final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobDetailsHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			metricFetcher);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -317,6 +330,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
 		handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
 		handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+		handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
 		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
 		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
 		handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
new file mode 100644
index 0000000..0c0ee18
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler returning the details for the specified job.
+ */
+public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {
+
+	private final MetricFetcher<?> metricFetcher;
+
+	public JobDetailsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor,
+			MetricFetcher<?> metricFetcher) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			messageHeaders,
+			executionGraphCache,
+			executor);
+
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+	}
+
+	@Override
+	protected JobDetailsInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+			AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+		final long now = System.currentTimeMillis();
+		final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
+		final long endTime = executionGraph.getState().isGloballyTerminalState() ?
+			executionGraph.getStatusTimestamp(executionGraph.getState()) : -1L;
+		final long duration = (endTime > 0L ? endTime : now) - startTime;
+
+		final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);
+
+		for (JobStatus jobStatus : JobStatus.values()) {
+			timestamps.put(jobStatus, executionGraph.getStatusTimestamp(jobStatus));
+		}
+
+		Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(executionGraph.getAllVertices().size());
+		int[] jobVerticesPerState = new int[ExecutionState.values().length];
+
+		for (AccessExecutionJobVertex accessExecutionJobVertex : executionGraph.getVerticesTopologically()) {
+			final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo = createJobVertexDetailsInfo(
+				accessExecutionJobVertex,
+				now,
+				executionGraph.getJobID(),
+				metricFetcher);
+
+			jobVertexInfos.add(vertexDetailsInfo);
+			jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
+		}
+
+		Map<ExecutionState, Integer> jobVerticesPerStateMap = new HashMap<>(ExecutionState.values().length);
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			jobVerticesPerStateMap.put(executionState, jobVerticesPerState[executionState.ordinal()]);
+		}
+
+		return new JobDetailsInfo(
+			executionGraph.getJobID(),
+			executionGraph.getJobName(),
+			executionGraph.isStoppable(),
+			executionGraph.getState(),
+			startTime,
+			endTime,
+			duration,
+			now,
+			timestamps,
+			jobVertexInfos,
+			jobVerticesPerStateMap,
+			executionGraph.getJsonPlan());
+	}
+
+	public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
+			AccessExecutionJobVertex ejv,
+			long now,
+			JobID jobId,
+			MetricFetcher<?> metricFetcher) {
+		int[] tasksPerState = new int[ExecutionState.values().length];
+		long startTime = Long.MAX_VALUE;
+		long endTime = 0;
+		boolean allFinished = true;
+
+		for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+			final ExecutionState state = vertex.getExecutionState();
+			tasksPerState[state.ordinal()]++;
+
+			// take the earliest start time
+			long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+			if (started > 0L) {
+				startTime = Math.min(startTime, started);
+			}
+
+			allFinished &= state.isTerminal();
+			endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+		}
+
+		long duration;
+		if (startTime < Long.MAX_VALUE) {
+			if (allFinished) {
+				duration = endTime - startTime;
+			}
+			else {
+				endTime = -1L;
+				duration = now - startTime;
+			}
+		}
+		else {
+			startTime = -1L;
+			endTime = -1L;
+			duration = -1L;
+		}
+
+		ExecutionState jobVertexState =
+			ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
+
+		Map<ExecutionState, Integer> tasksPerStateMap = new HashMap<>(tasksPerState.length);
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			tasksPerStateMap.put(executionState, tasksPerState[executionState.ordinal()]);
+		}
+
+		MutableIOMetrics counts = new MutableIOMetrics();
+
+		for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+			counts.addIOMetrics(
+				vertex.getCurrentExecutionAttempt(),
+				metricFetcher,
+				jobId.toString(),
+				ejv.getJobVertexId().toString());
+		}
+
+		final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+			counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
+			counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
+			counts.getNumBytesOut(),
+			counts.isNumBytesOutComplete(),
+			counts.getNumRecordsIn(),
+			counts.isNumRecordsInComplete(),
+			counts.getNumRecordsOut(),
+			counts.isNumRecordsOutComplete());
+
+		return new JobDetailsInfo.JobVertexDetailsInfo(
+			ejv.getJobVertexId(),
+			ejv.getName(),
+			ejv.getParallelism(),
+			jobVertexState,
+			startTime,
+			endTime,
+			duration,
+			tasksPerStateMap,
+			jobVertexMetrics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 224e63d..1b172dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -54,6 +54,26 @@ public class MutableIOMetrics extends IOMetrics {
 		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
 	}
 
+	public boolean isNumBytesInLocalComplete() {
+		return numBytesInLocalComplete;
+	}
+
+	public boolean isNumBytesInRemoteComplete() {
+		return numBytesInRemoteComplete;
+	}
+
+	public boolean isNumBytesOutComplete() {
+		return numBytesOutComplete;
+	}
+
+	public boolean isNumRecordsInComplete() {
+		return numRecordsInComplete;
+	}
+
+	public boolean isNumRecordsOutComplete() {
+		return numRecordsOutComplete;
+	}
+
 	/**
 	 * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
 	 * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index a7793fb..333c016 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
-import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -106,7 +106,7 @@ public class CheckpointStatistics implements ResponseBody {
 	private final int numAckSubtasks;
 
 	@JsonProperty(FIELD_NAME_TASKS)
-	@JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+	@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
 	private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
 
 	@JsonCreator
@@ -121,7 +121,7 @@ public class CheckpointStatistics implements ResponseBody {
 			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
 			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
 			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
-			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+			@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
 		this.id = id;
 		this.status = Preconditions.checkNotNull(status);
 		this.savepoint = savepoint;
@@ -309,7 +309,7 @@ public class CheckpointStatistics implements ResponseBody {
 			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
 			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
 			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
-			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+			@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
 			@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
 			@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
 			super(
@@ -388,7 +388,7 @@ public class CheckpointStatistics implements ResponseBody {
 			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
 			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
 			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
-			@JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+			@JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
 			@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
 			@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
 			super(

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
new file mode 100644
index 0000000..bd0b730
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobDetailsHandler}.
+ */
+public class JobDetailsHeaders implements MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
+
+	private static final JobDetailsHeaders INSTANCE = new JobDetailsHeaders();
+
+	public static final String URL = "/jobs/:" + JobIDPathParameter.KEY;
+
+	private JobDetailsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobDetailsInfo> getResponseClass() {
+		return JobDetailsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobDetailsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
new file mode 100644
index 0000000..e4d04d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Details about a job.
+ */
+public class JobDetailsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_JOB_ID = "jid";
+
+	public static final String FIELD_NAME_JOB_NAME = "name";
+
+	public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable";
+
+	public static final String FIELD_NAME_JOB_STATUS = "state";
+
+	public static final String FIELD_NAME_START_TIME = "start-time";
+
+	public static final String FIELD_NAME_END_TIME = "end-time";
+
+	public static final String FIELD_NAME_DURATION = "duration";
+
+	// TODO: For what do we need this???
+	public static final String FIELD_NAME_NOW = "now";
+
+	public static final String FIELD_NAME_TIMESTAMPS = "timestamps";
+
+	public static final String FIELD_NAME_JOB_VERTEX_INFOS = "vertices";
+
+	public static final String FIELD_NAME_JOB_VERTICES_PER_STATE = "status-counts";
+
+	public static final String FIELD_NAME_JSON_PLAN = "plan";
+
+	@JsonProperty(FIELD_NAME_JOB_ID)
+	@JsonSerialize(using = JobIDSerializer.class)
+	private final JobID jobId;
+
+	@JsonProperty(FIELD_NAME_JOB_NAME)
+	private final String name;
+
+	@JsonProperty(FIELD_NAME_IS_STOPPABLE)
+	private final boolean isStoppable;
+
+	@JsonProperty(FIELD_NAME_JOB_STATUS)
+	private final JobStatus jobStatus;
+
+	@JsonProperty(FIELD_NAME_START_TIME)
+	private final long startTime;
+
+	@JsonProperty(FIELD_NAME_END_TIME)
+	private final long endTime;
+
+	@JsonProperty(FIELD_NAME_DURATION)
+	private final long duration;
+
+	@JsonProperty(FIELD_NAME_NOW)
+	private final long now;
+
+	@JsonProperty(FIELD_NAME_TIMESTAMPS)
+	private final Map<JobStatus, Long> timestamps;
+
+	@JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS)
+	private final Collection<JobVertexDetailsInfo> jobVertexInfos;
+
+	@JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE)
+	private final Map<ExecutionState, Integer> jobVerticesPerState;
+
+	@JsonProperty(FIELD_NAME_JSON_PLAN)
+	private final String jsonPlan;
+
+	@JsonCreator
+	public JobDetailsInfo(
+			@JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
+			@JsonProperty(FIELD_NAME_JOB_NAME) String name,
+			@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
+			@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
+			@JsonProperty(FIELD_NAME_START_TIME) long startTime,
+			@JsonProperty(FIELD_NAME_END_TIME) long endTime,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_NOW) long now,
+			@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<JobStatus, Long> timestamps,
+			@JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS) Collection<JobVertexDetailsInfo> jobVertexInfos,
+			@JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map<ExecutionState, Integer> jobVerticesPerState,
+			@JsonProperty(FIELD_NAME_JSON_PLAN) String jsonPlan) {
+		this.jobId = Preconditions.checkNotNull(jobId);
+		this.name = Preconditions.checkNotNull(name);
+		this.isStoppable = isStoppable;
+		this.jobStatus = Preconditions.checkNotNull(jobStatus);
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.duration = duration;
+		this.now = now;
+		this.timestamps = Preconditions.checkNotNull(timestamps);
+		this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos);
+		this.jobVerticesPerState = Preconditions.checkNotNull(jobVerticesPerState);
+		this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobDetailsInfo that = (JobDetailsInfo) o;
+		return isStoppable == that.isStoppable &&
+			startTime == that.startTime &&
+			endTime == that.endTime &&
+			duration == that.duration &&
+			now == that.now &&
+			Objects.equals(jobId, that.jobId) &&
+			Objects.equals(name, that.name) &&
+			jobStatus == that.jobStatus &&
+			Objects.equals(timestamps, that.timestamps) &&
+			Objects.equals(jobVertexInfos, that.jobVertexInfos) &&
+			Objects.equals(jobVerticesPerState, that.jobVerticesPerState) &&
+			Objects.equals(jsonPlan, that.jsonPlan);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
+	}
+
+	// ---------------------------------------------------
+	// Static inner classes
+	// ---------------------------------------------------
+
+	/**
+	 * Detailed information about a job vertex.
+	 */
+	public static final class JobVertexDetailsInfo {
+
+		public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+
+		public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";
+
+		public static final String FIELD_NAME_PARALLELISM = "parallelism";
+
+		public static final String FIELD_NAME_JOB_VERTEX_STATE = "status";
+
+		public static final String FIELD_NAME_JOB_VERTEX_START_TIME = "start-time";
+
+		public static final String FIELD_NAME_JOB_VERTEX_END_TIME = "end-time";
+
+		public static final String FIELD_NAME_JOB_VERTEX_DURATION = "duration";
+
+		public static final String FIELD_NAME_TASKS_PER_STATE = "tasks";
+
+		public static final String FIELD_NAME_JOB_VERTEX_METRICS = "metrics";
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+		@JsonSerialize(using = JobVertexIDSerializer.class)
+		private final JobVertexID jobVertexID;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
+		private final String name;
+
+		@JsonProperty(FIELD_NAME_PARALLELISM)
+		private final int parallelism;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_STATE)
+		private final ExecutionState executionState;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_START_TIME)
+		private final long startTime;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME)
+		private final long endTime;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION)
+		private final long duration;
+
+		@JsonProperty(FIELD_NAME_TASKS_PER_STATE)
+		private final Map<ExecutionState, Integer> tasksPerState;
+
+		@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS)
+		private final JobVertexMetrics jobVertexMetrics;
+
+		@JsonCreator
+		public JobVertexDetailsInfo(
+				@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexID,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
+				@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_STATE) ExecutionState executionState,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_START_TIME) long startTime,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME) long endTime,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_TASKS_PER_STATE) Map<ExecutionState, Integer> tasksPerState,
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) JobVertexMetrics jobVertexMetrics) {
+			this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+			this.name = Preconditions.checkNotNull(name);
+			this.parallelism = parallelism;
+			this.executionState = Preconditions.checkNotNull(executionState);
+			this.startTime = startTime;
+			this.endTime = endTime;
+			this.duration = duration;
+			this.tasksPerState = Preconditions.checkNotNull(tasksPerState);
+			this.jobVertexMetrics = Preconditions.checkNotNull(jobVertexMetrics);
+		}
+
+		public JobVertexID getJobVertexID() {
+			return jobVertexID;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public int getParallelism() {
+			return parallelism;
+		}
+
+		public ExecutionState getExecutionState() {
+			return executionState;
+		}
+
+		public long getStartTime() {
+			return startTime;
+		}
+
+		public long getEndTime() {
+			return endTime;
+		}
+
+		public long getDuration() {
+			return duration;
+		}
+
+		public Map<ExecutionState, Integer> getTasksPerState() {
+			return tasksPerState;
+		}
+
+		public JobVertexMetrics getJobVertexMetrics() {
+			return jobVertexMetrics;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			JobVertexDetailsInfo that = (JobVertexDetailsInfo) o;
+			return parallelism == that.parallelism &&
+				startTime == that.startTime &&
+				endTime == that.endTime &&
+				duration == that.duration &&
+				Objects.equals(jobVertexID, that.jobVertexID) &&
+				Objects.equals(name, that.name) &&
+				executionState == that.executionState &&
+				Objects.equals(tasksPerState, that.tasksPerState) &&
+				Objects.equals(jobVertexMetrics, that.jobVertexMetrics);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(jobVertexID, name, parallelism, executionState, startTime, endTime, duration, tasksPerState, jobVertexMetrics);
+		}
+	}
+
+	/**
+	 * Metrics of a job vertex.
+	 */
+	public static final class JobVertexMetrics {
+
+		public static final String FIELD_NAME_BYTES_READ = "read-bytes";
+
+		public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete";
+
+		public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
+
+		public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete";
+
+		public static final String FIELD_NAME_RECORDS_READ = "read-records";
+
+		public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete";
+
+		public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
+
+		public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
+
+		@JsonProperty(FIELD_NAME_BYTES_READ)
+		private final long bytesRead;
+
+		@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
+		private final boolean bytesReadComplete;
+
+		@JsonProperty(FIELD_NAME_BYTES_WRITTEN)
+		private final long bytesWritten;
+
+		@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
+		private final boolean bytesWrittenComplete;
+
+		@JsonProperty(FIELD_NAME_RECORDS_READ)
+		private final long recordsRead;
+
+		@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
+		private final boolean recordsReadComplete;
+
+		@JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
+		private final long recordsWritten;
+
+		@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
+		private final boolean recordsWrittenComplete;
+
+		@JsonCreator
+		public JobVertexMetrics(
+				@JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
+				@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete,
+				@JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
+				@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete,
+				@JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
+				@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
+				@JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
+				@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+			this.bytesRead = bytesRead;
+			this.bytesReadComplete = bytesReadComplete;
+			this.bytesWritten = bytesWritten;
+			this.bytesWrittenComplete = bytesWrittenComplete;
+			this.recordsRead = recordsRead;
+			this.recordsReadComplete = recordsReadComplete;
+			this.recordsWritten = recordsWritten;
+			this.recordsWrittenComplete = recordsWrittenComplete;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			JobVertexMetrics that = (JobVertexMetrics) o;
+			return bytesRead == that.bytesRead &&
+				bytesReadComplete == that.bytesReadComplete &&
+				bytesWritten == that.bytesWritten &&
+				bytesWrittenComplete == that.bytesWrittenComplete &&
+				recordsRead == that.recordsRead &&
+				recordsReadComplete == that.recordsReadComplete &&
+				recordsWritten == that.recordsWritten &&
+				recordsWrittenComplete == that.recordsWrittenComplete;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
new file mode 100644
index 0000000..228423d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer for {@link JobID}.
+ */
+public class JobIDDeserializer extends StdDeserializer<JobID> {
+	private static final long serialVersionUID = -130167416771003559L;
+
+	protected JobIDDeserializer() {
+		super(JobID.class);
+	}
+
+	@Override
+	public JobID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+		return JobID.fromHexString(p.getValueAsString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
new file mode 100644
index 0000000..e386423
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Json serializer for {@link JobID}.
+ */
+public class JobIDSerializer extends StdSerializer<JobID> {
+
+	private static final long serialVersionUID = -6598593519161574611L;
+
+	protected JobIDSerializer() {
+		super(JobID.class);
+	}
+
+	@Override
+	public void serialize(JobID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+		gen.writeString(value.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
index 00cfe4e..a43031e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
@@ -20,18 +20,25 @@ package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
 
 /**
  * Jackson deserializer for {@link JobVertexID}.
  */
-public class JobVertexIDDeserializer extends KeyDeserializer {
+public class JobVertexIDDeserializer extends StdDeserializer<JobVertexID> {
+
+	private static final long serialVersionUID = 3051901462549718924L;
+
+	protected JobVertexIDDeserializer() {
+		super(JobVertexID.class);
+	}
 
 	@Override
-	public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
-		return JobVertexID.fromHexString(key);
+	public JobVertexID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+		return JobVertexID.fromHexString(p.getValueAsString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
new file mode 100644
index 0000000..5cd2f22
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson deserializer for {@link JobVertexID}.
+ */
+public class JobVertexIDKeyDeserializer extends KeyDeserializer {
+
+	@Override
+	public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
+		return JobVertexID.fromHexString(key);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
new file mode 100644
index 0000000..c98b154
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson serializer for {@link JobVertexID} used as a key serializer.
+ */
+public class JobVertexIDKeySerializer extends StdSerializer<JobVertexID> {
+
+	private static final long serialVersionUID = 2970050507628933522L;
+
+	public JobVertexIDKeySerializer() {
+		super(JobVertexID.class);
+	}
+
+	@Override
+	public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+		gen.writeFieldName(value.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
index 2e53e52..f3703bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
@@ -31,7 +31,7 @@ import java.io.IOException;
  */
 public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
 
-	private static final long serialVersionUID = 2970050507628933522L;
+	private static final long serialVersionUID = -2339350570828548335L;
 
 	public JobVertexIDSerializer() {
 		super(JobVertexID.class);
@@ -39,6 +39,6 @@ public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
 
 	@Override
 	public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
-		gen.writeFieldName(value.toString());
+		gen.writeString(value.toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
new file mode 100644
index 0000000..5e2e09d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Tests (un)marshalling of the {@link JobDetailsInfo}.
+ */
+public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetailsInfo> {
+
+	@Override
+	protected Class<JobDetailsInfo> getTestResponseClass() {
+		return JobDetailsInfo.class;
+	}
+
+	@Override
+	protected JobDetailsInfo getTestResponseInstance() throws Exception {
+		final Random random = new Random();
+		final int numJobVertexDetailsInfos = 4;
+		final String jsonPlan = "{id: \"1234\"}";
+
+		final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);
+		final Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(numJobVertexDetailsInfos);
+		final Map<ExecutionState, Integer> jobVerticesPerState = new HashMap<>(ExecutionState.values().length);
+
+		for (JobStatus jobStatus : JobStatus.values()) {
+			timestamps.put(jobStatus, random.nextLong());
+		}
+
+		for (int i = 0; i < numJobVertexDetailsInfos; i++) {
+			jobVertexInfos.add(createJobVertexDetailsInfo(random));
+		}
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			jobVerticesPerState.put(executionState, random.nextInt());
+		}
+
+		return new JobDetailsInfo(
+			new JobID(),
+			"foobar",
+			true,
+			JobStatus.values()[random.nextInt(JobStatus.values().length)],
+			1L,
+			2L,
+			1L,
+			1984L,
+			timestamps,
+			jobVertexInfos,
+			jobVerticesPerState,
+			jsonPlan);
+	}
+
+	private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) {
+		final Map<ExecutionState, Integer> tasksPerState = new HashMap<>(ExecutionState.values().length);
+		final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean());
+
+		for (ExecutionState executionState : ExecutionState.values()) {
+			tasksPerState.put(executionState, random.nextInt());
+		}
+
+		return new JobDetailsInfo.JobVertexDetailsInfo(
+			new JobVertexID(),
+			"jobVertex" + random.nextLong(),
+			random.nextInt(),
+			ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
+			random.nextLong(),
+			random.nextLong(),
+			random.nextLong(),
+			tasksPerState,
+			jobVertexMetrics);
+	}
+}