You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/13 07:18:31 UTC
[3/4] flink git commit: [FLINK-9771][rest] Fix plan JSON response
[FLINK-9771][rest] Fix plan JSON response
This closes #6274.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa160b52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa160b52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa160b52
Branch: refs/heads/release-1.5
Commit: aa160b528922c1e0781941b3d2140540919ac326
Parents: 2f7ab90
Author: zentol <ch...@apache.org>
Authored: Fri Jul 6 11:22:20 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Jul 13 06:10:32 2018 +0200
----------------------------------------------------------------------
.../runtime/rest/messages/JobPlanInfo.java | 115 +++++++++++++------
.../runtime/rest/messages/JobPlanInfoTest.java | 9 +-
2 files changed, 86 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa160b52/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
index 11cb89b..7263b36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.rest.messages;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -36,18 +39,20 @@ import java.util.Objects;
/**
* Response type of the {@link JobPlanHandler}.
*/
-@JsonSerialize(using = JobPlanInfo.Serializer.class)
-@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
public class JobPlanInfo implements ResponseBody {
- private final String jsonPlan;
+ private static final String FIELD_NAME_PLAN = "plan";
+
+ @JsonProperty(FIELD_NAME_PLAN)
+ private final RawJson jsonPlan;
public JobPlanInfo(String jsonPlan) {
- this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+ this(new RawJson(Preconditions.checkNotNull(jsonPlan)));
}
- public String getJsonPlan() {
- return jsonPlan;
+ @JsonCreator
+ public JobPlanInfo(@JsonProperty(FIELD_NAME_PLAN) RawJson jsonPlan) {
+ this.jsonPlan = jsonPlan;
}
@Override
@@ -67,47 +72,91 @@ public class JobPlanInfo implements ResponseBody {
return Objects.hash(jsonPlan);
}
- //---------------------------------------------------------------------------------
- // Static helper classes
- //---------------------------------------------------------------------------------
+ @Override
+ public String toString() {
+ return "JobPlanInfo{" +
+ "jsonPlan=" + jsonPlan +
+ '}';
+ }
/**
- * Json serializer for the {@link JobPlanInfo}.
+ * Simple wrapper around a raw JSON string.
*/
- public static final class Serializer extends StdSerializer<JobPlanInfo> {
+ @JsonSerialize(using = RawJson.Serializer.class)
+ @JsonDeserialize(using = RawJson.Deserializer.class)
+ public static final class RawJson {
+ private final String json;
- private static final long serialVersionUID = -1551666039618928811L;
+ private RawJson(String json) {
+ this.json = json;
+ }
- public Serializer() {
- super(JobPlanInfo.class);
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RawJson rawJson = (RawJson) o;
+ return Objects.equals(json, rawJson.json);
}
@Override
- public void serialize(
- JobPlanInfo jobPlanInfo,
- JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException {
- jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
+ public int hashCode() {
+ return Objects.hash(json);
}
- }
- /**
- * Json deserializer for the {@link JobPlanInfo}.
- */
- public static final class Deserializer extends StdDeserializer<JobPlanInfo> {
+ @Override
+ public String toString() {
+ return "RawJson{" +
+ "json='" + json + '\'' +
+ '}';
+ }
+
+ //---------------------------------------------------------------------------------
+ // Static helper classes
+ //---------------------------------------------------------------------------------
+
+ /**
+ * Json serializer for the {@link RawJson}.
+ */
+ public static final class Serializer extends StdSerializer<RawJson> {
+
+ private static final long serialVersionUID = -1551666039618928811L;
- private static final long serialVersionUID = -3580088509877177213L;
+ public Serializer() {
+ super(RawJson.class);
+ }
- public Deserializer() {
- super(JobPlanInfo.class);
+ @Override
+ public void serialize(
+ RawJson jobPlanInfo,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider) throws IOException {
+ jsonGenerator.writeRawValue(jobPlanInfo.json);
+ }
}
- @Override
- public JobPlanInfo deserialize(
- JsonParser jsonParser,
- DeserializationContext deserializationContext) throws IOException {
- final String jsonPlan = jsonParser.getText();
- return new JobPlanInfo(jsonPlan);
+ /**
+ * Json deserializer for the {@link RawJson}.
+ */
+ public static final class Deserializer extends StdDeserializer<RawJson> {
+
+ private static final long serialVersionUID = -3580088509877177213L;
+
+ public Deserializer() {
+ super(RawJson.class);
+ }
+
+ @Override
+ public RawJson deserialize(
+ JsonParser jsonParser,
+ DeserializationContext deserializationContext) throws IOException {
+ final JsonNode rootNode = jsonParser.readValueAsTree();
+ return new RawJson(rootNode.toString());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aa160b52/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
index 8c5cad5..22d5a71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobPlanInfoTest.java
@@ -18,7 +18,8 @@
package org.apache.flink.runtime.rest.messages;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
/**
* Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
@@ -32,9 +33,7 @@ public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo
@Override
protected JobPlanInfo getTestResponseInstance() {
- JobID jobID = new JobID();
- String jobName = "job_007";
- String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}";
- return new JobPlanInfo(jsonPlan);
+ JobGraph jg = new JobGraph("job_007");
+ return new JobPlanInfo(JsonPlanGenerator.generatePlan(jg));
}
}