You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/01 23:11:50 UTC

[2/2] flink git commit: [FLINK-8141] [flip6] Fix JsonPlan serialization in JobDetailsInfo

[FLINK-8141] [flip6] Fix JsonPlan serialization in JobDetailsInfo

The JsonPlan in JobDetailInfo must be serialized as a raw value
to make it parsable for downstream applications.

This closes #5109.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee9027e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee9027e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee9027e4

Branch: refs/heads/master
Commit: ee9027e491ff50bf72f51c869a5095aef7092396
Parents: c1ffc11
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 12:17:50 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 00:11:21 2017 +0100

----------------------------------------------------------------------
 .../rest/handler/job/JobDetailsHandler.java     |  2 +-
 .../rest/messages/job/JobDetailsInfo.java       |  5 ++-
 .../rest/messages/json/RawJsonDeserializer.java | 45 ++++++++++++++++++++
 .../rest/messages/job/JobDetailsInfoTest.java   |  2 +-
 4 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee9027e4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 0c0ee18..647763a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -126,7 +126,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 			executionGraph.getJsonPlan());
 	}
 
-	public static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
+	private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
 			AccessExecutionJobVertex ejv,
 			long now,
 			JobID jobId,

http://git-wip-us.apache.org/repos/asf/flink/blob/ee9027e4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index e4d04d5..551913f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -27,10 +27,12 @@ import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
 import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.runtime.rest.messages.json.RawJsonDeserializer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonRawValue;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
@@ -103,6 +105,7 @@ public class JobDetailsInfo implements ResponseBody {
 	private final Map<ExecutionState, Integer> jobVerticesPerState;
 
 	@JsonProperty(FIELD_NAME_JSON_PLAN)
+	@JsonRawValue
 	private final String jsonPlan;
 
 	@JsonCreator
@@ -118,7 +121,7 @@ public class JobDetailsInfo implements ResponseBody {
 			@JsonProperty(FIELD_NAME_TIMESTAMPS) Map<JobStatus, Long> timestamps,
 			@JsonProperty(FIELD_NAME_JOB_VERTEX_INFOS) Collection<JobVertexDetailsInfo> jobVertexInfos,
 			@JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map<ExecutionState, Integer> jobVerticesPerState,
-			@JsonProperty(FIELD_NAME_JSON_PLAN) String jsonPlan) {
+			@JsonProperty(FIELD_NAME_JSON_PLAN) @JsonDeserialize(using = RawJsonDeserializer.class) String jsonPlan) {
 		this.jobId = Preconditions.checkNotNull(jobId);
 		this.name = Preconditions.checkNotNull(name);
 		this.isStoppable = isStoppable;

http://git-wip-us.apache.org/repos/asf/flink/blob/ee9027e4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java
new file mode 100644
index 0000000..c423917
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/RawJsonDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Json deserializer which deserializes raw json.
+ */
+public final class RawJsonDeserializer extends StdDeserializer<String> {
+
+	private static final long serialVersionUID = -4089499607872996396L;
+
+	protected RawJsonDeserializer() {
+		super(String.class);
+	}
+
+	@Override
+	public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+		final JsonNode jsonNode = ctxt.readValue(p, JsonNode.class);
+
+		return jsonNode.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee9027e4/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 5e2e09d..aec8674 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -44,7 +44,7 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai
 	protected JobDetailsInfo getTestResponseInstance() throws Exception {
 		final Random random = new Random();
 		final int numJobVertexDetailsInfos = 4;
-		final String jsonPlan = "{id: \"1234\"}";
+		final String jsonPlan = "{\"id\":\"1234\"}";
 
 		final Map<JobStatus, Long> timestamps = new HashMap<>(JobStatus.values().length);
 		final Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = new ArrayList<>(numJobVertexDetailsInfos);