You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:15 UTC
[17/30] flink git commit: [FLINK-7705] Add JobDetailsHandler
[FLINK-7705] Add JobDetailsHandler
Add JobID(De)Serializer and JobVertexID(De)Serializer for jackson
This closes #4884.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de201a6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de201a6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de201a6c
Branch: refs/heads/master
Commit: de201a6c0f8739c12918817d2ef571abb2898f37
Parents: 1c78dee
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 09:59:17 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:43 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 14 +
.../rest/handler/job/JobDetailsHandler.java | 209 ++++++++++
.../rest/handler/util/MutableIOMetrics.java | 20 +
.../checkpoints/CheckpointStatistics.java | 12 +-
.../rest/messages/job/JobDetailsHeaders.java | 74 ++++
.../rest/messages/job/JobDetailsInfo.java | 392 +++++++++++++++++++
.../rest/messages/json/JobIDDeserializer.java | 43 ++
.../rest/messages/json/JobIDSerializer.java | 44 +++
.../messages/json/JobVertexIDDeserializer.java | 15 +-
.../json/JobVertexIDKeyDeserializer.java | 37 ++
.../messages/json/JobVertexIDKeySerializer.java | 44 +++
.../messages/json/JobVertexIDSerializer.java | 4 +-
.../rest/messages/job/JobDetailsInfoTest.java | 107 +++++
13 files changed, 1003 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 12187e3..e13fd5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
@@ -67,6 +68,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -289,6 +291,17 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
TaskManagerDetailsHeaders.getInstance(),
resourceManagerRetriever,
metricFetcher);
+
+ final JobDetailsHandler jobDetailsHandler = new JobDetailsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobDetailsHeaders.getInstance(),
+ executionGraphCache,
+ executor,
+ metricFetcher);
+
final File tmpDir = restConfiguration.getTmpDir();
Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -317,6 +330,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
+ handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
new file mode 100644
index 0000000..0c0ee18
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler returning the details for the specified job.
+ */
+public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {
+
+ private final MetricFetcher<?> metricFetcher;
+
+ public JobDetailsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ MetricFetcher<?> metricFetcher) {
+ super(
+ localRestAddress,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+
+ this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+ }
+
+ @Override
+ protected JobDetailsInfo handleRequest(
+ HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+ AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+ final long now = System.currentTimeMillis();
+ final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
+ final long endTime = executionGraph.getState().isGloballyTerminalState() ?
+ executionGraph.getStatusTimestamp(executionGraph.getState()) : -1L;
+ final long duration = (endTime > 0L ? endTime : now) - startTime;
+
+ final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);
+
+ for (JobStatus jobStatus : JobStatus.values()) {
+ timestamps.put(jobStatus, executionGraph.getStatusTimestamp(jobStatus));
+ }
+
+ Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(executionGraph.getAllVertices().size());
+ int[] jobVerticesPerState = new int[ExecutionState.values().length];
+
+ for (AccessExecutionJobVertex accessExecutionJobVertex : executionGraph.getVerticesTopologically()) {
+ final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo = createJobVertexDetailsInfo(
+ accessExecutionJobVertex,
+ now,
+ executionGraph.getJobID(),
+ metricFetcher);
+
+ jobVertexInfos.add(vertexDetailsInfo);
+ jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
+ }
+
+ Map<ExecutionState, Integer> jobVerticesPerStateMap = new HashMap<>(ExecutionState.values().length);
+
+ for (ExecutionState executionState : ExecutionState.values()) {
+ jobVerticesPerStateMap.put(executionState, jobVerticesPerState[executionState.ordinal()]);
+ }
+
+ return new JobDetailsInfo(
+ executionGraph.getJobID(),
+ executionGraph.getJobName(),
+ executionGraph.isStoppable(),
+ executionGraph.getState(),
+ startTime,
+ endTime,
+ duration,
+ now,
+ timestamps,
+ jobVertexInfos,
+ jobVerticesPerStateMap,
+ executionGraph.getJsonPlan());
+ }
+
+ public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
+ AccessExecutionJobVertex ejv,
+ long now,
+ JobID jobId,
+ MetricFetcher<?> metricFetcher) {
+ int[] tasksPerState = new int[ExecutionState.values().length];
+ long startTime = Long.MAX_VALUE;
+ long endTime = 0;
+ boolean allFinished = true;
+
+ for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+ final ExecutionState state = vertex.getExecutionState();
+ tasksPerState[state.ordinal()]++;
+
+ // take the earliest start time
+ long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+ if (started > 0L) {
+ startTime = Math.min(startTime, started);
+ }
+
+ allFinished &= state.isTerminal();
+ endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+ }
+
+ long duration;
+ if (startTime < Long.MAX_VALUE) {
+ if (allFinished) {
+ duration = endTime - startTime;
+ }
+ else {
+ endTime = -1L;
+ duration = now - startTime;
+ }
+ }
+ else {
+ startTime = -1L;
+ endTime = -1L;
+ duration = -1L;
+ }
+
+ ExecutionState jobVertexState =
+ ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
+
+ Map<ExecutionState, Integer> tasksPerStateMap = new HashMap<>(tasksPerState.length);
+
+ for (ExecutionState executionState : ExecutionState.values()) {
+ tasksPerStateMap.put(executionState, tasksPerState[executionState.ordinal()]);
+ }
+
+ MutableIOMetrics counts = new MutableIOMetrics();
+
+ for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
+ counts.addIOMetrics(
+ vertex.getCurrentExecutionAttempt(),
+ metricFetcher,
+ jobId.toString(),
+ ejv.getJobVertexId().toString());
+ }
+
+ final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+ counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
+ counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
+ counts.getNumBytesOut(),
+ counts.isNumBytesOutComplete(),
+ counts.getNumRecordsIn(),
+ counts.isNumRecordsInComplete(),
+ counts.getNumRecordsOut(),
+ counts.isNumRecordsOutComplete());
+
+ return new JobDetailsInfo.JobVertexDetailsInfo(
+ ejv.getJobVertexId(),
+ ejv.getName(),
+ ejv.getParallelism(),
+ jobVertexState,
+ startTime,
+ endTime,
+ duration,
+ tasksPerStateMap,
+ jobVertexMetrics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
index 224e63d..1b172dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
@@ -54,6 +54,26 @@ public class MutableIOMetrics extends IOMetrics {
super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
}
+ public boolean isNumBytesInLocalComplete() {
+ return numBytesInLocalComplete;
+ }
+
+ public boolean isNumBytesInRemoteComplete() {
+ return numBytesInRemoteComplete;
+ }
+
+ public boolean isNumBytesOutComplete() {
+ return numBytesOutComplete;
+ }
+
+ public boolean isNumRecordsInComplete() {
+ return numRecordsInComplete;
+ }
+
+ public boolean isNumRecordsOutComplete() {
+ return numRecordsOutComplete;
+ }
+
/**
* Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
* a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index a7793fb..333c016 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
-import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -106,7 +106,7 @@ public class CheckpointStatistics implements ResponseBody {
private final int numAckSubtasks;
@JsonProperty(FIELD_NAME_TASKS)
- @JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+ @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask;
@JsonCreator
@@ -121,7 +121,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
- @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
this.savepoint = savepoint;
@@ -309,7 +309,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
- @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
super(
@@ -388,7 +388,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
- @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
super(
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
new file mode 100644
index 0000000..bd0b730
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobDetailsHandler}.
+ */
+public class JobDetailsHeaders implements MessageHeaders<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
+
+ private static final JobDetailsHeaders INSTANCE = new JobDetailsHeaders();
+
+ public static final String URL = "/jobs/:" + JobIDPathParameter.KEY;
+
+ private JobDetailsHeaders() {}
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<JobDetailsInfo> getResponseClass() {
+ return JobDetailsInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobDetailsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
new file mode 100644
index 0000000..e4d04d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Details about a job.
+ */
+public class JobDetailsInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_JOB_ID = "jid";
+
+ public static final String FIELD_NAME_JOB_NAME = "name";
+
+ public static final String FIELD_NAME_IS_STOPPABLE = "isStoppable";
+
+ public static final String FIELD_NAME_JOB_STATUS = "state";
+
+ public static final String FIELD_NAME_START_TIME = "start-time";
+
+ public static final String FIELD_NAME_END_TIME = "end-time";
+
+ public static final String FIELD_NAME_DURATION = "duration";
+
+ // TODO: For what do we need this???
+ public static final String FIELD_NAME_NOW = "now";
+
+ public static final String FIELD_NAME_TIMESTAMPS = "timestamps";
+
+ public static final String FIELD_NAME_JOB_VERTEX_INFOS = "vertices";
+
+ public static final String FIELD_NAME_JOB_VERTICES_PER_STATE = "status-counts";
+
+ public static final String FIELD_NAME_JSON_PLAN = "plan";
+
+ @JsonProperty(FIELD_NAME_JOB_ID)
+ @JsonSerialize(using = JobIDSerializer.class)
+ private final JobID jobId;
+
+ @JsonProperty(FIELD_NAME_JOB_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_NAME_IS_STOPPABLE)
+ private final boolean isStoppable;
+
+ @JsonProperty(FIELD_NAME_JOB_STATUS)
+ private final JobStatus jobStatus;
+
+ @JsonProperty(FIELD_NAME_START_TIME)
+ private final long startTime;
+
+ @JsonProperty(FIELD_NAME_END_TIME)
+ private final long endTime;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_NOW)
+ private final long now;
+
+ @JsonProperty(FIELD_NAME_TIMESTAMPS)
+ private final Map<JobStatus, Long> timestamps;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS)
+ private final Collection<JobVertexDetailsInfo> jobVertexInfos;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE)
+ private final Map<ExecutionState, Integer> jobVerticesPerState;
+
+ @JsonProperty(FIELD_NAME_JSON_PLAN)
+ private final String jsonPlan;
+
+ @JsonCreator
+ public JobDetailsInfo(
+ @JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
+ @JsonProperty(FIELD_NAME_JOB_NAME) String name,
+ @JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
+ @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
+ @JsonProperty(FIELD_NAME_START_TIME) long startTime,
+ @JsonProperty(FIELD_NAME_END_TIME) long endTime,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_NOW) long now,
+ @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<JobStatus, Long> timestamps,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS) Collection<JobVertexDetailsInfo> jobVertexInfos,
+ @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map<ExecutionState, Integer> jobVerticesPerState,
+ @JsonProperty(FIELD_NAME_JSON_PLAN) String jsonPlan) {
+ this.jobId = Preconditions.checkNotNull(jobId);
+ this.name = Preconditions.checkNotNull(name);
+ this.isStoppable = isStoppable;
+ this.jobStatus = Preconditions.checkNotNull(jobStatus);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.duration = duration;
+ this.now = now;
+ this.timestamps = Preconditions.checkNotNull(timestamps);
+ this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos);
+ this.jobVerticesPerState = Preconditions.checkNotNull(jobVerticesPerState);
+ this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobDetailsInfo that = (JobDetailsInfo) o;
+ return isStoppable == that.isStoppable &&
+ startTime == that.startTime &&
+ endTime == that.endTime &&
+ duration == that.duration &&
+ now == that.now &&
+ Objects.equals(jobId, that.jobId) &&
+ Objects.equals(name, that.name) &&
+ jobStatus == that.jobStatus &&
+ Objects.equals(timestamps, that.timestamps) &&
+ Objects.equals(jobVertexInfos, that.jobVertexInfos) &&
+ Objects.equals(jobVerticesPerState, that.jobVerticesPerState) &&
+ Objects.equals(jsonPlan, that.jsonPlan);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobId, name, isStoppable, jobStatus, startTime, endTime, duration, now, timestamps, jobVertexInfos, jobVerticesPerState, jsonPlan);
+ }
+
+ // ---------------------------------------------------
+ // Static inner classes
+ // ---------------------------------------------------
+
+ /**
+ * Detailed information about a job vertex.
+ */
+ public static final class JobVertexDetailsInfo {
+
+ public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+
+ public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";
+
+ public static final String FIELD_NAME_PARALLELISM = "parallelism";
+
+ public static final String FIELD_NAME_JOB_VERTEX_STATE = "status";
+
+ public static final String FIELD_NAME_JOB_VERTEX_START_TIME = "start-time";
+
+ public static final String FIELD_NAME_JOB_VERTEX_END_TIME = "end-time";
+
+ public static final String FIELD_NAME_JOB_VERTEX_DURATION = "duration";
+
+ public static final String FIELD_NAME_TASKS_PER_STATE = "tasks";
+
+ public static final String FIELD_NAME_JOB_VERTEX_METRICS = "metrics";
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ private final JobVertexID jobVertexID;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_NAME_PARALLELISM)
+ private final int parallelism;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_STATE)
+ private final ExecutionState executionState;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_START_TIME)
+ private final long startTime;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME)
+ private final long endTime;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_TASKS_PER_STATE)
+ private final Map<ExecutionState, Integer> tasksPerState;
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS)
+ private final JobVertexMetrics jobVertexMetrics;
+
+ @JsonCreator
+ public JobVertexDetailsInfo(
+ @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexID,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
+ @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_STATE) ExecutionState executionState,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_START_TIME) long startTime,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME) long endTime,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_TASKS_PER_STATE) Map<ExecutionState, Integer> tasksPerState,
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) JobVertexMetrics jobVertexMetrics) {
+ this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+ this.name = Preconditions.checkNotNull(name);
+ this.parallelism = parallelism;
+ this.executionState = Preconditions.checkNotNull(executionState);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.duration = duration;
+ this.tasksPerState = Preconditions.checkNotNull(tasksPerState);
+ this.jobVertexMetrics = Preconditions.checkNotNull(jobVertexMetrics);
+ }
+
+ public JobVertexID getJobVertexID() {
+ return jobVertexID;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public ExecutionState getExecutionState() {
+ return executionState;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public Map<ExecutionState, Integer> getTasksPerState() {
+ return tasksPerState;
+ }
+
+ public JobVertexMetrics getJobVertexMetrics() {
+ return jobVertexMetrics;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobVertexDetailsInfo that = (JobVertexDetailsInfo) o;
+ return parallelism == that.parallelism &&
+ startTime == that.startTime &&
+ endTime == that.endTime &&
+ duration == that.duration &&
+ Objects.equals(jobVertexID, that.jobVertexID) &&
+ Objects.equals(name, that.name) &&
+ executionState == that.executionState &&
+ Objects.equals(tasksPerState, that.tasksPerState) &&
+ Objects.equals(jobVertexMetrics, that.jobVertexMetrics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobVertexID, name, parallelism, executionState, startTime, endTime, duration, tasksPerState, jobVertexMetrics);
+ }
+ }
+
+ /**
+ * Metrics of a job vertex.
+ */
+ public static final class JobVertexMetrics {
+
+ public static final String FIELD_NAME_BYTES_READ = "read-bytes";
+
+ public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete";
+
+ public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
+
+ public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete";
+
+ public static final String FIELD_NAME_RECORDS_READ = "read-records";
+
+ public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete";
+
+ public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
+
+ public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
+
+ @JsonProperty(FIELD_NAME_BYTES_READ)
+ private final long bytesRead;
+
+ @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
+ private final boolean bytesReadComplete;
+
+ @JsonProperty(FIELD_NAME_BYTES_WRITTEN)
+ private final long bytesWritten;
+
+ @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
+ private final boolean bytesWrittenComplete;
+
+ @JsonProperty(FIELD_NAME_RECORDS_READ)
+ private final long recordsRead;
+
+ @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
+ private final boolean recordsReadComplete;
+
+ @JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
+ private final long recordsWritten;
+
+ @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
+ private final boolean recordsWrittenComplete;
+
+ @JsonCreator
+ public JobVertexMetrics(
+ @JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
+ @JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete,
+ @JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
+ @JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete,
+ @JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
+ @JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
+ @JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
+ @JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+ this.bytesRead = bytesRead;
+ this.bytesReadComplete = bytesReadComplete;
+ this.bytesWritten = bytesWritten;
+ this.bytesWrittenComplete = bytesWrittenComplete;
+ this.recordsRead = recordsRead;
+ this.recordsReadComplete = recordsReadComplete;
+ this.recordsWritten = recordsWritten;
+ this.recordsWrittenComplete = recordsWrittenComplete;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobVertexMetrics that = (JobVertexMetrics) o;
+ return bytesRead == that.bytesRead &&
+ bytesReadComplete == that.bytesReadComplete &&
+ bytesWritten == that.bytesWritten &&
+ bytesWrittenComplete == that.bytesWrittenComplete &&
+ recordsRead == that.recordsRead &&
+ recordsReadComplete == that.recordsReadComplete &&
+ recordsWritten == that.recordsWritten &&
+ recordsWrittenComplete == that.recordsWrittenComplete;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
new file mode 100644
index 0000000..228423d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer for {@link JobID}.
+ */
+public class JobIDDeserializer extends StdDeserializer<JobID> {
+ private static final long serialVersionUID = -130167416771003559L;
+
+ protected JobIDDeserializer() {
+ super(JobID.class);
+ }
+
+ @Override
+ public JobID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ return JobID.fromHexString(p.getValueAsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
new file mode 100644
index 0000000..e386423
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Json serializer for {@link JobID}.
+ */
+public class JobIDSerializer extends StdSerializer<JobID> {
+
+ private static final long serialVersionUID = -6598593519161574611L;
+
+ protected JobIDSerializer() {
+ super(JobID.class);
+ }
+
+ @Override
+ public void serialize(JobID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeString(value.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
index 00cfe4e..a43031e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
@@ -20,18 +20,25 @@ package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
/**
* Jackson deserializer for {@link JobVertexID}.
*/
-public class JobVertexIDDeserializer extends KeyDeserializer {
+public class JobVertexIDDeserializer extends StdDeserializer<JobVertexID> {
+
+ private static final long serialVersionUID = 3051901462549718924L;
+
+ protected JobVertexIDDeserializer() {
+ super(JobVertexID.class);
+ }
@Override
- public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
- return JobVertexID.fromHexString(key);
+ public JobVertexID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ return JobVertexID.fromHexString(p.getValueAsString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
new file mode 100644
index 0000000..5cd2f22
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeyDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson deserializer for {@link JobVertexID}.
+ */
+public class JobVertexIDKeyDeserializer extends KeyDeserializer {
+
+ @Override
+ public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
+ return JobVertexID.fromHexString(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
new file mode 100644
index 0000000..c98b154
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDKeySerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson serializer for {@link JobVertexID} used as a key serializer.
+ */
+public class JobVertexIDKeySerializer extends StdSerializer<JobVertexID> {
+
+ private static final long serialVersionUID = 2970050507628933522L;
+
+ public JobVertexIDKeySerializer() {
+ super(JobVertexID.class);
+ }
+
+ @Override
+ public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeFieldName(value.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
index 2e53e52..f3703bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
@@ -31,7 +31,7 @@ import java.io.IOException;
*/
public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
- private static final long serialVersionUID = 2970050507628933522L;
+ private static final long serialVersionUID = -2339350570828548335L;
public JobVertexIDSerializer() {
super(JobVertexID.class);
@@ -39,6 +39,6 @@ public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
@Override
public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
- gen.writeFieldName(value.toString());
+ gen.writeString(value.toString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/de201a6c/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
new file mode 100644
index 0000000..5e2e09d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Tests (un)marshalling of the {@link JobDetailsInfo}.
+ */
+public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetailsInfo> {
+
+ @Override
+ protected Class<JobDetailsInfo> getTestResponseClass() {
+ return JobDetailsInfo.class;
+ }
+
+ @Override
+ protected JobDetailsInfo getTestResponseInstance() throws Exception {
+ final Random random = new Random();
+ final int numJobVertexDetailsInfos = 4;
+ final String jsonPlan = "{id: \"1234\"}";
+
+ final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);
+ final Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(numJobVertexDetailsInfos);
+ final Map<ExecutionState, Integer> jobVerticesPerState = new HashMap<>(ExecutionState.values().length);
+
+ for (JobStatus jobStatus : JobStatus.values()) {
+ timestamps.put(jobStatus, random.nextLong());
+ }
+
+ for (int i = 0; i < numJobVertexDetailsInfos; i++) {
+ jobVertexInfos.add(createJobVertexDetailsInfo(random));
+ }
+
+ for (ExecutionState executionState : ExecutionState.values()) {
+ jobVerticesPerState.put(executionState, random.nextInt());
+ }
+
+ return new JobDetailsInfo(
+ new JobID(),
+ "foobar",
+ true,
+ JobStatus.values()[random.nextInt(JobStatus.values().length)],
+ 1L,
+ 2L,
+ 1L,
+ 1984L,
+ timestamps,
+ jobVertexInfos,
+ jobVerticesPerState,
+ jsonPlan);
+ }
+
+ private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) {
+ final Map<ExecutionState, Integer> tasksPerState = new HashMap<>(ExecutionState.values().length);
+ final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean());
+
+ for (ExecutionState executionState : ExecutionState.values()) {
+ tasksPerState.put(executionState, random.nextInt());
+ }
+
+ return new JobDetailsInfo.JobVertexDetailsInfo(
+ new JobVertexID(),
+ "jobVertex" + random.nextLong(),
+ random.nextInt(),
+ ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
+ random.nextLong(),
+ random.nextLong(),
+ random.nextLong(),
+ tasksPerState,
+ jobVertexMetrics);
+ }
+}