You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/22 08:22:39 UTC
[flink] branch master updated: [FLINK-29423][rest] Remove custom JobDetails serializer
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ec89c17da5d [FLINK-29423][rest] Remove custom JobDetails serializer
ec89c17da5d is described below
commit ec89c17da5d555ca44f89d3f8739fa9ae00734b7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Nov 11 13:55:40 2022 +0100
[FLINK-29423][rest] Remove custom JobDetails serializer
---
.../shortcodes/generated/rest_v1_dispatcher.html | 33 +++-
docs/static/generated/rest_v1_dispatcher.yml | 41 ++---
.../src/test/resources/rest_api_v1.snapshot | 33 +++-
.../runtime/messages/webmonitor/JobDetails.java | 167 +++++++++------------
4 files changed, 141 insertions(+), 133 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index ac1c58c1299..baa681d261b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1096,7 +1096,38 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
"jobs" : {
"type" : "array",
"items" : {
- "type" : "any"
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+ "properties" : {
+ "duration" : {
+ "type" : "integer"
+ },
+ "end-time" : {
+ "type" : "integer"
+ },
+ "jid" : {
+ "type" : "any"
+ },
+ "last-modification" : {
+ "type" : "integer"
+ },
+ "name" : {
+ "type" : "string"
+ },
+ "start-time" : {
+ "type" : "integer"
+ },
+ "state" : {
+ "type" : "string",
+ "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
+ },
+ "tasks" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ }
+ }
}
}
}
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index c1dcc55d097..146b66469f3 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1886,18 +1886,6 @@ components:
total:
type: integer
format: int64
- CurrentAttempts:
- type: object
- properties:
- currentAttempts:
- uniqueItems: true
- type: array
- items:
- type: integer
- format: int32
- representativeAttempt:
- type: integer
- format: int32
DashboardConfiguration:
type: object
properties:
@@ -2218,36 +2206,27 @@ components:
JobDetails:
type: object
properties:
- currentExecutionAttempts:
- type: object
- additionalProperties:
- type: object
- additionalProperties:
- $ref: '#/components/schemas/CurrentAttempts'
duration:
type: integer
format: int64
- endTime:
+ end-time:
type: integer
format: int64
- jobId:
+ jid:
$ref: '#/components/schemas/JobID'
- jobName:
- type: string
- lastUpdateTime:
+ last-modification:
type: integer
format: int64
- numTasks:
- type: integer
- format: int32
- startTime:
+ name:
+ type: string
+ start-time:
type: integer
format: int64
- status:
+ state:
$ref: '#/components/schemas/JobStatus'
- tasksPerState:
- type: array
- items:
+ tasks:
+ type: object
+ additionalProperties:
type: integer
format: int32
JobDetailsInfo:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index ea31ca3d138..0ae17a6ce11 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -786,7 +786,38 @@
"jobs" : {
"type" : "array",
"items" : {
- "type" : "any"
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+ "properties" : {
+ "jid" : {
+ "type" : "any"
+ },
+ "name" : {
+ "type" : "string"
+ },
+ "start-time" : {
+ "type" : "integer"
+ },
+ "end-time" : {
+ "type" : "integer"
+ },
+ "duration" : {
+ "type" : "integer"
+ },
+ "state" : {
+ "type" : "string",
+ "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
+ },
+ "last-modification" : {
+ "type" : "integer"
+ },
+ "tasks" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "integer"
+ }
+ }
+ }
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 04fce9483c0..ed53e2014fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -26,23 +26,21 @@ import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -50,8 +48,6 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** An actor message with a detailed overview of the current status of a job. */
-@JsonSerialize(using = JobDetails.JobDetailsSerializer.class)
-@JsonDeserialize(using = JobDetails.JobDetailsDeserializer.class)
public class JobDetails implements Serializable {
private static final long serialVersionUID = -3391462110304948766L;
@@ -64,6 +60,7 @@ public class JobDetails implements Serializable {
private static final String FIELD_NAME_STATUS = "state";
private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+ private static final String FIELD_NAME_TASKS = "tasks";
private final JobID jobId;
@@ -83,6 +80,8 @@ public class JobDetails implements Serializable {
private final int numTasks;
+ private transient Map<String, Integer> lazyTaskInfo = null;
+
/**
* The map holds the attempt number of the current execution attempt in the Execution, which is
* considered as the representing execution for the subtask of the vertex. The keys and values
@@ -93,6 +92,29 @@ public class JobDetails implements Serializable {
*/
private final Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts;
+ @JsonCreator
+ public JobDetails(
+ @JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = JobIDDeserializer.class)
+ JobID jobId,
+ @JsonProperty(FIELD_NAME_JOB_NAME) String jobName,
+ @JsonProperty(FIELD_NAME_START_TIME) long startTime,
+ @JsonProperty(FIELD_NAME_END_TIME) long endTime,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_STATUS) JobStatus status,
+ @JsonProperty(FIELD_NAME_LAST_MODIFICATION) long lastUpdateTime,
+ @JsonProperty(FIELD_NAME_TASKS) Map<String, Integer> taskInfo) {
+ this(
+ jobId,
+ jobName,
+ startTime,
+ endTime,
+ duration,
+ status,
+ lastUpdateTime,
+ extractNumTasksPerState(taskInfo),
+ taskInfo.get(FIELD_NAME_TOTAL_NUMBER_TASKS));
+ }
+
@VisibleForTesting
public JobDetails(
JobID jobId,
@@ -197,47 +219,82 @@ public class JobDetails implements Serializable {
// ------------------------------------------------------------------------
+ @JsonProperty(FIELD_NAME_JOB_ID)
+ @JsonSerialize(using = JobIDSerializer.class)
public JobID getJobId() {
return jobId;
}
+ @JsonProperty(FIELD_NAME_JOB_NAME)
public String getJobName() {
return jobName;
}
+ @JsonProperty(FIELD_NAME_START_TIME)
public long getStartTime() {
return startTime;
}
+ @JsonProperty(FIELD_NAME_END_TIME)
public long getEndTime() {
return endTime;
}
+ @JsonProperty(FIELD_NAME_DURATION)
public long getDuration() {
return duration;
}
+ @JsonProperty(FIELD_NAME_STATUS)
public JobStatus getStatus() {
return status;
}
+ @JsonProperty(FIELD_NAME_LAST_MODIFICATION)
public long getLastUpdateTime() {
return lastUpdateTime;
}
+ @JsonProperty(FIELD_NAME_TASKS)
+ public Map<String, Integer> getTaskInfo() {
+ if (lazyTaskInfo == null) {
+ final Map<String, Integer> taskInfo = new HashMap<>();
+ taskInfo.put(FIELD_NAME_TOTAL_NUMBER_TASKS, getNumTasks());
+ for (ExecutionState executionState : ExecutionState.values()) {
+ taskInfo.put(
+ executionState.name().toLowerCase(),
+ tasksPerState[executionState.ordinal()]);
+ }
+ lazyTaskInfo = taskInfo;
+ }
+ return lazyTaskInfo;
+ }
+
+ @JsonIgnore
public int getNumTasks() {
return numTasks;
}
+ @JsonIgnore
public int[] getTasksPerState() {
return tasksPerState;
}
+ @JsonIgnore
public Map<String, Map<Integer, CurrentAttempts>> getCurrentExecutionAttempts() {
return currentExecutionAttempts;
}
// ------------------------------------------------------------------------
+ private static int[] extractNumTasksPerState(Map<String, Integer> ex) {
+ int[] tasksPerState = new int[ExecutionState.values().length];
+ for (ExecutionState value : ExecutionState.values()) {
+ tasksPerState[value.ordinal()] =
+ ex.getOrDefault(value.name().toLowerCase(Locale.ROOT), 0);
+ }
+ return tasksPerState;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -296,96 +353,6 @@ public class JobDetails implements Serializable {
+ '}';
}
- public static final class JobDetailsSerializer extends StdSerializer<JobDetails> {
- private static final long serialVersionUID = 7915913423515194428L;
-
- public JobDetailsSerializer() {
- super(JobDetails.class);
- }
-
- @Override
- public void serialize(
- JobDetails jobDetails,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider)
- throws IOException {
- jsonGenerator.writeStartObject();
-
- jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobDetails.getJobId().toString());
- jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobDetails.getJobName());
- jsonGenerator.writeStringField(FIELD_NAME_STATUS, jobDetails.getStatus().name());
-
- jsonGenerator.writeNumberField(FIELD_NAME_START_TIME, jobDetails.getStartTime());
- jsonGenerator.writeNumberField(FIELD_NAME_END_TIME, jobDetails.getEndTime());
- jsonGenerator.writeNumberField(FIELD_NAME_DURATION, jobDetails.getDuration());
- jsonGenerator.writeNumberField(
- FIELD_NAME_LAST_MODIFICATION, jobDetails.getLastUpdateTime());
-
- jsonGenerator.writeObjectFieldStart("tasks");
- jsonGenerator.writeNumberField(FIELD_NAME_TOTAL_NUMBER_TASKS, jobDetails.getNumTasks());
-
- final int[] perState = jobDetails.getTasksPerState();
-
- for (ExecutionState executionState : ExecutionState.values()) {
- jsonGenerator.writeNumberField(
- executionState.name().toLowerCase(), perState[executionState.ordinal()]);
- }
-
- jsonGenerator.writeEndObject();
-
- jsonGenerator.writeEndObject();
- }
- }
-
- public static final class JobDetailsDeserializer extends StdDeserializer<JobDetails> {
-
- private static final long serialVersionUID = 6089784742093294800L;
-
- public JobDetailsDeserializer() {
- super(JobDetails.class);
- }
-
- @Override
- public JobDetails deserialize(
- JsonParser jsonParser, DeserializationContext deserializationContext)
- throws IOException {
-
- JsonNode rootNode = jsonParser.readValueAsTree();
-
- JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).textValue());
- String jobName = rootNode.get(FIELD_NAME_JOB_NAME).textValue();
- long startTime = rootNode.get(FIELD_NAME_START_TIME).longValue();
- long endTime = rootNode.get(FIELD_NAME_END_TIME).longValue();
- long duration = rootNode.get(FIELD_NAME_DURATION).longValue();
- JobStatus jobStatus = JobStatus.valueOf(rootNode.get(FIELD_NAME_STATUS).textValue());
- long lastUpdateTime = rootNode.get(FIELD_NAME_LAST_MODIFICATION).longValue();
-
- JsonNode tasksNode = rootNode.get("tasks");
- int numTasks = tasksNode.get(FIELD_NAME_TOTAL_NUMBER_TASKS).intValue();
-
- int[] numVerticesPerExecutionState = new int[ExecutionState.values().length];
-
- for (ExecutionState executionState : ExecutionState.values()) {
- JsonNode jsonNode = tasksNode.get(executionState.name().toLowerCase());
-
- numVerticesPerExecutionState[executionState.ordinal()] =
- jsonNode == null ? 0 : jsonNode.intValue();
- }
-
- return new JobDetails(
- jobId,
- jobName,
- startTime,
- endTime,
- duration,
- jobStatus,
- lastUpdateTime,
- numVerticesPerExecutionState,
- numTasks,
- new HashMap<>());
- }
- }
-
/**
* The CurrentAttempts holds the attempt number of the current representative execution attempt,
* and the attempt numbers of all the running attempts.