You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 10:35:37 UTC

[GitHub] [flink] twalthr commented on a change in pull request #18264: [FLINK-25518][table-planner] Harden JSON utilities for serde of the persisted plan

twalthr commented on a change in pull request #18264:
URL: https://github.com/apache/flink/pull/18264#discussion_r779434345



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
##########
@@ -176,14 +173,32 @@ private RelDataType deserialize(JsonNode jsonNode, FlinkDeserializationContext c
                                 : null;
                 final RelDataType type;
                 if (sqlTypeName == SqlTypeName.ARRAY) {
-                    RelDataType elementType = deserialize(objectNode.get(FIELD_NAME_ELEMENT), ctx);
+                    RelDataType elementType =
+                            deserialize(

Review comment:
       This makes the code quite verbose. Now `.traverse(jsonParser.getCodec())` is everywhere in the code. Isn't it possible to restore the original code at this location?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##########
@@ -42,22 +60,83 @@ public static boolean hasJsonCreatorAnnotation(Class<?> clazz) {
         return false;
     }
 
-    /** Create an {@link ObjectMapper} which DeserializationContext wraps a {@link SerdeContext}. */
-    public static ObjectMapper createObjectMapper(SerdeContext serdeCtx) {
-        FlinkDeserializationContext ctx =
-                new FlinkDeserializationContext(
-                        new DefaultDeserializationContext.Impl(BeanDeserializerFactory.instance),
-                        serdeCtx);
-        ObjectMapper mapper =
-                new ObjectMapper(
-                        null, // JsonFactory
-                        null, // DefaultSerializerProvider
-                        ctx);
-        mapper.setTypeFactory(
-                mapper.getTypeFactory().withClassLoader(JsonSerdeUtil.class.getClassLoader()));
-        mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
-        ctx.setObjectMapper(mapper);
-        return mapper;
+    // Object mapper shared instance to serialize and deserialize the plan
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
+
+    static {
+        OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+
+        OBJECT_MAPPER_INSTANCE.setTypeFactory(
+                // Make sure to register the classloader of the planner
+                OBJECT_MAPPER_INSTANCE
+                        .getTypeFactory()
+                        .withClassLoader(JsonSerdeUtil.class.getClassLoader()));
+        OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
+    }
+
+    /** Get the {@link ObjectMapper} instance. */
+    public static ObjectMapper getObjectMapper() {
+        return OBJECT_MAPPER_INSTANCE;
+    }
+
+    public static ObjectReader createObjectReader(SerdeContext serdeContext) {
+        return OBJECT_MAPPER_INSTANCE
+                .reader()
+                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
+    }
+
+    public static ObjectWriter createObjectWriter(SerdeContext serdeContext) {
+        return OBJECT_MAPPER_INSTANCE
+                .writer()
+                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
+    }
+
+    private static Module createFlinkTableJacksonModule() {
+        final SimpleModule module = new SimpleModule("Flink table types");

Review comment:
       nit: `types` is used at too many locations, call it `Flink table module`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
##########
@@ -68,24 +66,25 @@ public LogicalWindowJsonDeserializer() {
     @Override
     public LogicalWindow deserialize(
             JsonParser jsonParser, DeserializationContext deserializationContext)
-            throws IOException, JsonProcessingException {
-        FlinkDeserializationContext flinkDeserializationContext =
-                (FlinkDeserializationContext) deserializationContext;
-        ObjectMapper mapper = flinkDeserializationContext.getObjectMapper();
+            throws IOException {
         JsonNode jsonNode = jsonParser.readValueAsTree();
         String kind = jsonNode.get(FIELD_NAME_KIND).asText().toUpperCase();
         WindowReference alias =
-                mapper.readValue(jsonNode.get(FIELD_NAME_ALIAS).toString(), WindowReference.class);
+                jsonParser

Review comment:
       What is the difference between this and
   ```
   deserializationContext.readValue(
                                       jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
                                       Duration.class);
   ```
   Why is this not consistent?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
##########
@@ -107,17 +103,18 @@ private RelDataType deserialize(JsonNode jsonNode, FlinkDeserializationContext c
             } else if (objectNode.has(FIELD_NAME_STRUCTURED_TYPE)) {
                 JsonNode structuredTypeNode = objectNode.get(FIELD_NAME_STRUCTURED_TYPE);
                 LogicalType structuredType =
-                        ctx.getObjectMapper()
-                                .readValue(structuredTypeNode.toPrettyString(), LogicalType.class);
+                        ctx.readValue(
+                                structuredTypeNode.traverse(jsonParser.getCodec()),
+                                LogicalType.class);
                 checkArgument(structuredType instanceof StructuredType);
-                return ctx.getSerdeContext()
-                        .getTypeFactory()
-                        .createFieldTypeFromLogicalType(structuredType);
+                return serdeContext.getTypeFactory().createFieldTypeFromLogicalType(structuredType);
             } else if (objectNode.has(FIELD_NAME_STRUCT_KIND)) {
                 ArrayNode arrayNode = (ArrayNode) objectNode.get(FIELD_NAME_FIELDS);
                 RelDataTypeFactory.Builder builder = typeFactory.builder();
                 for (JsonNode node : arrayNode) {
-                    builder.add(node.get(FIELD_NAME_FILED_NAME).asText(), deserialize(node, ctx));
+                    builder.add(
+                            node.get(FIELD_NAME_FILED_NAME).asText(),
+                            deserialize(node.traverse(jsonParser.getCodec()), ctx));

Review comment:
       let's reintroduce a method to avoid calling 
   
   ```
   deserialize(node.traverse(jsonParser.getCodec()), ctx)
   ```
   repeatedly. in general it would be good if every deserializer has a public `deserialize(JsonNode, SerdeContext)` method that can be called by other deserializers directly without having to go through Jackson one more time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org