You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/13 07:18:31 UTC

[3/4] flink git commit: [FLINK-9771][rest] Fix plan JSON response

[FLINK-9771][rest] Fix plan JSON response

This closes #6274.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa160b52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa160b52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa160b52

Branch: refs/heads/release-1.5
Commit: aa160b528922c1e0781941b3d2140540919ac326
Parents: 2f7ab90
Author: zentol <ch...@apache.org>
Authored: Fri Jul 6 11:22:20 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jul 13 06:10:32 2018 +0200

----------------------------------------------------------------------
 .../runtime/rest/messages/JobPlanInfo.java      | 115 +++++++++++++------
 .../runtime/rest/messages/JobPlanInfoTest.java  |   9 +-
 2 files changed, 86 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa160b52/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
index 11cb89b..7263b36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.rest.messages;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -36,18 +39,20 @@ import java.util.Objects;
 /**
  * Response type of the {@link JobPlanHandler}.
  */
-@JsonSerialize(using = JobPlanInfo.Serializer.class)
-@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
 public class JobPlanInfo implements ResponseBody {
 
-	private final String jsonPlan;
+	private static final String FIELD_NAME_PLAN = "plan";
+
+	@JsonProperty(FIELD_NAME_PLAN)
+	private final RawJson jsonPlan;
 
 	public JobPlanInfo(String jsonPlan) {
-		this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+		this(new RawJson(Preconditions.checkNotNull(jsonPlan)));
 	}
 
-	public String getJsonPlan() {
-		return jsonPlan;
+	@JsonCreator
+	public JobPlanInfo(@JsonProperty(FIELD_NAME_PLAN) RawJson jsonPlan) {
+		this.jsonPlan = jsonPlan;
 	}
 
 	@Override
@@ -67,47 +72,91 @@ public class JobPlanInfo implements ResponseBody {
 		return Objects.hash(jsonPlan);
 	}
 
-	//---------------------------------------------------------------------------------
-	// Static helper classes
-	//---------------------------------------------------------------------------------
+	@Override
+	public String toString() {
+		return "JobPlanInfo{" +
+			"jsonPlan=" + jsonPlan +
+			'}';
+	}
 
 	/**
-	 * Json serializer for the {@link JobPlanInfo}.
+	 * Simple wrapper around a raw JSON string.
 	 */
-	public static final class Serializer extends StdSerializer<JobPlanInfo> {
+	@JsonSerialize(using = RawJson.Serializer.class)
+	@JsonDeserialize(using = RawJson.Deserializer.class)
+	public static final class RawJson {
+		private final String json;
 
-		private static final long serialVersionUID = -1551666039618928811L;
+		private RawJson(String json) {
+			this.json = json;
+		}
 
-		public Serializer() {
-			super(JobPlanInfo.class);
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			RawJson rawJson = (RawJson) o;
+			return Objects.equals(json, rawJson.json);
 		}
 
 		@Override
-		public void serialize(
-			JobPlanInfo jobPlanInfo,
-			JsonGenerator jsonGenerator,
-			SerializerProvider serializerProvider) throws IOException {
-			jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
+		public int hashCode() {
+			return Objects.hash(json);
 		}
-	}
 
-	/**
-	 * Json deserializer for the {@link JobPlanInfo}.
-	 */
-	public static final class Deserializer extends StdDeserializer<JobPlanInfo> {
+		@Override
+		public String toString() {
+			return "RawJson{" +
+				"json='" + json + '\'' +
+				'}';
+		}
+
+		//---------------------------------------------------------------------------------
+		// Static helper classes
+		//---------------------------------------------------------------------------------
+
+		/**
+		 * Json serializer for the {@link RawJson}.
+		 */
+		public static final class Serializer extends StdSerializer<RawJson> {
+
+			private static final long serialVersionUID = -1551666039618928811L;
 
-		private static final long serialVersionUID = -3580088509877177213L;
+			public Serializer() {
+				super(RawJson.class);
+			}
 
-		public Deserializer() {
-			super(JobPlanInfo.class);
+			@Override
+			public void serialize(
+					RawJson jobPlanInfo,
+					JsonGenerator jsonGenerator,
+					SerializerProvider serializerProvider) throws IOException {
+				jsonGenerator.writeRawValue(jobPlanInfo.json);
+			}
 		}
 
-		@Override
-		public JobPlanInfo deserialize(
-			JsonParser jsonParser,
-			DeserializationContext deserializationContext) throws IOException {
-			final String jsonPlan = jsonParser.getText();
-			return new JobPlanInfo(jsonPlan);
+		/**
+		 * Json deserializer for the {@link RawJson}.
+		 */
+		public static final class Deserializer extends StdDeserializer<RawJson> {
+
+			private static final long serialVersionUID = -3580088509877177213L;
+
+			public Deserializer() {
+				super(RawJson.class);
+			}
+
+			@Override
+			public RawJson deserialize(
+					JsonParser jsonParser,
+					DeserializationContext deserializationContext) throws IOException {
+				final JsonNode rootNode = jsonParser.readValueAsTree();
+				return new RawJson(rootNode.toString());
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa160b52/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
index 8c5cad5..22d5a71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 
 /**
  * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
@@ -32,9 +33,7 @@ public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo
 
 	@Override
 	protected JobPlanInfo getTestResponseInstance() {
-		JobID jobID = new JobID();
-		String jobName = "job_007";
-		String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}";
-		return new JobPlanInfo(jsonPlan);
+		JobGraph jg = new JobGraph("job_007");
+		return new JobPlanInfo(JsonPlanGenerator.generatePlan(jg));
 	}
 }