You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/11/22 08:22:39 UTC

[flink] branch master updated: [FLINK-29423][rest] Remove custom JobDetails serializer

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec89c17da5d [FLINK-29423][rest] Remove custom JobDetails serializer
ec89c17da5d is described below

commit ec89c17da5d555ca44f89d3f8739fa9ae00734b7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Nov 11 13:55:40 2022 +0100

    [FLINK-29423][rest] Remove custom JobDetails serializer
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |  33 +++-
 docs/static/generated/rest_v1_dispatcher.yml       |  41 ++---
 .../src/test/resources/rest_api_v1.snapshot        |  33 +++-
 .../runtime/messages/webmonitor/JobDetails.java    | 167 +++++++++------------
 4 files changed, 141 insertions(+), 133 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index ac1c58c1299..baa681d261b 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -1096,7 +1096,38 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa
     "jobs" : {
       "type" : "array",
       "items" : {
-        "type" : "any"
+        "type" : "object",
+        "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+        "properties" : {
+          "duration" : {
+            "type" : "integer"
+          },
+          "end-time" : {
+            "type" : "integer"
+          },
+          "jid" : {
+            "type" : "any"
+          },
+          "last-modification" : {
+            "type" : "integer"
+          },
+          "name" : {
+            "type" : "string"
+          },
+          "start-time" : {
+            "type" : "integer"
+          },
+          "state" : {
+            "type" : "string",
+            "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
+          },
+          "tasks" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "integer"
+            }
+          }
+        }
       }
     }
   }
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml
index c1dcc55d097..146b66469f3 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -1886,18 +1886,6 @@ components:
         total:
           type: integer
           format: int64
-    CurrentAttempts:
-      type: object
-      properties:
-        currentAttempts:
-          uniqueItems: true
-          type: array
-          items:
-            type: integer
-            format: int32
-        representativeAttempt:
-          type: integer
-          format: int32
     DashboardConfiguration:
       type: object
       properties:
@@ -2218,36 +2206,27 @@ components:
     JobDetails:
       type: object
       properties:
-        currentExecutionAttempts:
-          type: object
-          additionalProperties:
-            type: object
-            additionalProperties:
-              $ref: '#/components/schemas/CurrentAttempts'
         duration:
           type: integer
           format: int64
-        endTime:
+        end-time:
           type: integer
           format: int64
-        jobId:
+        jid:
           $ref: '#/components/schemas/JobID'
-        jobName:
-          type: string
-        lastUpdateTime:
+        last-modification:
           type: integer
           format: int64
-        numTasks:
-          type: integer
-          format: int32
-        startTime:
+        name:
+          type: string
+        start-time:
           type: integer
           format: int64
-        status:
+        state:
           $ref: '#/components/schemas/JobStatus'
-        tasksPerState:
-          type: array
-          items:
+        tasks:
+          type: object
+          additionalProperties:
             type: integer
             format: int32
     JobDetailsInfo:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index ea31ca3d138..0ae17a6ce11 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -786,7 +786,38 @@
         "jobs" : {
           "type" : "array",
           "items" : {
-            "type" : "any"
+            "type" : "object",
+            "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobDetails",
+            "properties" : {
+              "jid" : {
+                "type" : "any"
+              },
+              "name" : {
+                "type" : "string"
+              },
+              "start-time" : {
+                "type" : "integer"
+              },
+              "end-time" : {
+                "type" : "integer"
+              },
+              "duration" : {
+                "type" : "integer"
+              },
+              "state" : {
+                "type" : "string",
+                "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
+              },
+              "last-modification" : {
+                "type" : "integer"
+              },
+              "tasks" : {
+                "type" : "object",
+                "additionalProperties" : {
+                  "type" : "integer"
+                }
+              }
+            }
           }
         }
       }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 04fce9483c0..ed53e2014fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -26,23 +26,21 @@ import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -50,8 +48,6 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** An actor message with a detailed overview of the current status of a job. */
-@JsonSerialize(using = JobDetails.JobDetailsSerializer.class)
-@JsonDeserialize(using = JobDetails.JobDetailsDeserializer.class)
 public class JobDetails implements Serializable {
 
     private static final long serialVersionUID = -3391462110304948766L;
@@ -64,6 +60,7 @@ public class JobDetails implements Serializable {
     private static final String FIELD_NAME_STATUS = "state";
     private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification";
     private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total";
+    private static final String FIELD_NAME_TASKS = "tasks";
 
     private final JobID jobId;
 
@@ -83,6 +80,8 @@ public class JobDetails implements Serializable {
 
     private final int numTasks;
 
+    private transient Map<String, Integer> lazyTaskInfo = null;
+
     /**
      * The map holds the attempt number of the current execution attempt in the Execution, which is
      * considered as the representing execution for the subtask of the vertex. The keys and values
@@ -93,6 +92,29 @@ public class JobDetails implements Serializable {
      */
     private final Map<String, Map<Integer, CurrentAttempts>> currentExecutionAttempts;
 
+    @JsonCreator
+    public JobDetails(
+            @JsonProperty(FIELD_NAME_JOB_ID) @JsonDeserialize(using = JobIDDeserializer.class)
+                    JobID jobId,
+            @JsonProperty(FIELD_NAME_JOB_NAME) String jobName,
+            @JsonProperty(FIELD_NAME_START_TIME) long startTime,
+            @JsonProperty(FIELD_NAME_END_TIME) long endTime,
+            @JsonProperty(FIELD_NAME_DURATION) long duration,
+            @JsonProperty(FIELD_NAME_STATUS) JobStatus status,
+            @JsonProperty(FIELD_NAME_LAST_MODIFICATION) long lastUpdateTime,
+            @JsonProperty(FIELD_NAME_TASKS) Map<String, Integer> taskInfo) {
+        this(
+                jobId,
+                jobName,
+                startTime,
+                endTime,
+                duration,
+                status,
+                lastUpdateTime,
+                extractNumTasksPerState(taskInfo),
+                taskInfo.get(FIELD_NAME_TOTAL_NUMBER_TASKS));
+    }
+
     @VisibleForTesting
     public JobDetails(
             JobID jobId,
@@ -197,47 +219,82 @@ public class JobDetails implements Serializable {
 
     // ------------------------------------------------------------------------
 
+    @JsonProperty(FIELD_NAME_JOB_ID)
+    @JsonSerialize(using = JobIDSerializer.class)
     public JobID getJobId() {
         return jobId;
     }
 
+    @JsonProperty(FIELD_NAME_JOB_NAME)
     public String getJobName() {
         return jobName;
     }
 
+    @JsonProperty(FIELD_NAME_START_TIME)
     public long getStartTime() {
         return startTime;
     }
 
+    @JsonProperty(FIELD_NAME_END_TIME)
     public long getEndTime() {
         return endTime;
     }
 
+    @JsonProperty(FIELD_NAME_DURATION)
     public long getDuration() {
         return duration;
     }
 
+    @JsonProperty(FIELD_NAME_STATUS)
     public JobStatus getStatus() {
         return status;
     }
 
+    @JsonProperty(FIELD_NAME_LAST_MODIFICATION)
     public long getLastUpdateTime() {
         return lastUpdateTime;
     }
 
+    @JsonProperty(FIELD_NAME_TASKS)
+    public Map<String, Integer> getTaskInfo() {
+        if (lazyTaskInfo == null) {
+            final Map<String, Integer> taskInfo = new HashMap<>();
+            taskInfo.put(FIELD_NAME_TOTAL_NUMBER_TASKS, getNumTasks());
+            for (ExecutionState executionState : ExecutionState.values()) {
+                taskInfo.put(
+                        executionState.name().toLowerCase(),
+                        tasksPerState[executionState.ordinal()]);
+            }
+            lazyTaskInfo = taskInfo;
+        }
+        return lazyTaskInfo;
+    }
+
+    @JsonIgnore
     public int getNumTasks() {
         return numTasks;
     }
 
+    @JsonIgnore
     public int[] getTasksPerState() {
         return tasksPerState;
     }
 
+    @JsonIgnore
     public Map<String, Map<Integer, CurrentAttempts>> getCurrentExecutionAttempts() {
         return currentExecutionAttempts;
     }
     // ------------------------------------------------------------------------
 
+    private static int[] extractNumTasksPerState(Map<String, Integer> ex) {
+        int[] tasksPerState = new int[ExecutionState.values().length];
+        for (ExecutionState value : ExecutionState.values()) {
+            tasksPerState[value.ordinal()] =
+                    ex.getOrDefault(value.name().toLowerCase(Locale.ROOT), 0);
+        }
+        return tasksPerState;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -296,96 +353,6 @@ public class JobDetails implements Serializable {
                 + '}';
     }
 
-    public static final class JobDetailsSerializer extends StdSerializer<JobDetails> {
-        private static final long serialVersionUID = 7915913423515194428L;
-
-        public JobDetailsSerializer() {
-            super(JobDetails.class);
-        }
-
-        @Override
-        public void serialize(
-                JobDetails jobDetails,
-                JsonGenerator jsonGenerator,
-                SerializerProvider serializerProvider)
-                throws IOException {
-            jsonGenerator.writeStartObject();
-
-            jsonGenerator.writeStringField(FIELD_NAME_JOB_ID, jobDetails.getJobId().toString());
-            jsonGenerator.writeStringField(FIELD_NAME_JOB_NAME, jobDetails.getJobName());
-            jsonGenerator.writeStringField(FIELD_NAME_STATUS, jobDetails.getStatus().name());
-
-            jsonGenerator.writeNumberField(FIELD_NAME_START_TIME, jobDetails.getStartTime());
-            jsonGenerator.writeNumberField(FIELD_NAME_END_TIME, jobDetails.getEndTime());
-            jsonGenerator.writeNumberField(FIELD_NAME_DURATION, jobDetails.getDuration());
-            jsonGenerator.writeNumberField(
-                    FIELD_NAME_LAST_MODIFICATION, jobDetails.getLastUpdateTime());
-
-            jsonGenerator.writeObjectFieldStart("tasks");
-            jsonGenerator.writeNumberField(FIELD_NAME_TOTAL_NUMBER_TASKS, jobDetails.getNumTasks());
-
-            final int[] perState = jobDetails.getTasksPerState();
-
-            for (ExecutionState executionState : ExecutionState.values()) {
-                jsonGenerator.writeNumberField(
-                        executionState.name().toLowerCase(), perState[executionState.ordinal()]);
-            }
-
-            jsonGenerator.writeEndObject();
-
-            jsonGenerator.writeEndObject();
-        }
-    }
-
-    public static final class JobDetailsDeserializer extends StdDeserializer<JobDetails> {
-
-        private static final long serialVersionUID = 6089784742093294800L;
-
-        public JobDetailsDeserializer() {
-            super(JobDetails.class);
-        }
-
-        @Override
-        public JobDetails deserialize(
-                JsonParser jsonParser, DeserializationContext deserializationContext)
-                throws IOException {
-
-            JsonNode rootNode = jsonParser.readValueAsTree();
-
-            JobID jobId = JobID.fromHexString(rootNode.get(FIELD_NAME_JOB_ID).textValue());
-            String jobName = rootNode.get(FIELD_NAME_JOB_NAME).textValue();
-            long startTime = rootNode.get(FIELD_NAME_START_TIME).longValue();
-            long endTime = rootNode.get(FIELD_NAME_END_TIME).longValue();
-            long duration = rootNode.get(FIELD_NAME_DURATION).longValue();
-            JobStatus jobStatus = JobStatus.valueOf(rootNode.get(FIELD_NAME_STATUS).textValue());
-            long lastUpdateTime = rootNode.get(FIELD_NAME_LAST_MODIFICATION).longValue();
-
-            JsonNode tasksNode = rootNode.get("tasks");
-            int numTasks = tasksNode.get(FIELD_NAME_TOTAL_NUMBER_TASKS).intValue();
-
-            int[] numVerticesPerExecutionState = new int[ExecutionState.values().length];
-
-            for (ExecutionState executionState : ExecutionState.values()) {
-                JsonNode jsonNode = tasksNode.get(executionState.name().toLowerCase());
-
-                numVerticesPerExecutionState[executionState.ordinal()] =
-                        jsonNode == null ? 0 : jsonNode.intValue();
-            }
-
-            return new JobDetails(
-                    jobId,
-                    jobName,
-                    startTime,
-                    endTime,
-                    duration,
-                    jobStatus,
-                    lastUpdateTime,
-                    numVerticesPerExecutionState,
-                    numTasks,
-                    new HashMap<>());
-        }
-    }
-
     /**
      * The CurrentAttempts holds the attempt number of the current representative execution attempt,
      * and the attempt numbers of all the running attempts.