You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/03 12:19:54 UTC

[flink] branch master updated: [hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2529767  [hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de
2529767 is described below

commit 2529767ca6708e45a4068b07349c81b15c7bb398
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Jan 31 17:29:25 2022 +0100

    [hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../nodes/exec/serde/DurationJsonDeserializer.java | 45 ---------------
 .../nodes/exec/serde/DurationJsonSerializer.java   | 44 ---------------
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  9 ++-
 .../exec/serde/LogicalWindowJsonDeserializer.java  |  9 +--
 .../nodes/exec/serde/DurationJsonSerdeTest.java    | 65 ----------------------
 5 files changed, 11 insertions(+), 161 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java
deleted file mode 100644
index e327efd..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import java.io.IOException;
-import java.time.Duration;
-
-/**
- * JSON deserializer for {@link Duration}. refer to {@link DurationJsonSerializer} for serializer.
- */
-public class DurationJsonDeserializer extends StdDeserializer<Duration> {
-    private static final long serialVersionUID = 1L;
-
-    public DurationJsonDeserializer() {
-        super(Duration.class);
-    }
-
-    @Override
-    public Duration deserialize(
-            JsonParser jsonParser, DeserializationContext deserializationContext)
-            throws IOException {
-        String duration = jsonParser.readValueAs(String.class);
-        return Duration.parse(duration);
-    }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java
deleted file mode 100644
index cb2d27e..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-import java.time.Duration;
-
-/**
- * JSON serializer for {@link Duration}. refer to {@link DurationJsonDeserializer} for deserializer.
- */
-public class DurationJsonSerializer extends StdSerializer<Duration> {
-    private static final long serialVersionUID = 1L;
-
-    public DurationJsonSerializer() {
-        super(Duration.class);
-    }
-
-    @Override
-    public void serialize(
-            Duration duration, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeString(duration.toString());
-    }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 5e83e95..f82b52e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -45,11 +45,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
@@ -60,7 +63,6 @@ import org.apache.calcite.rex.RexWindowBound;
 import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Constructor;
-import java.time.Duration;
 import java.util.Optional;
 
 /** An utility class that provide abilities for JSON serialization and deserialization. */
@@ -93,6 +95,9 @@ public class JsonSerdeUtil {
                         .getTypeFactory()
                         .withClassLoader(JsonSerdeUtil.class.getClassLoader()));
         OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
+        OBJECT_MAPPER_INSTANCE.registerModule(new Jdk8Module().configureAbsentsAsNulls(true));
+        OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
         OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
     }
 
@@ -136,7 +141,6 @@ public class JsonSerdeUtil {
         // RexNode is used in many exec nodes, so we register its serializer directly here
         module.addSerializer(new RexNodeJsonSerializer());
         module.addSerializer(new AggregateCallJsonSerializer());
-        module.addSerializer(new DurationJsonSerializer());
         module.addSerializer(new ChangelogModeJsonSerializer());
         module.addSerializer(new LogicalWindowJsonSerializer());
         module.addSerializer(new RexWindowBoundJsonSerializer());
@@ -164,7 +168,6 @@ public class JsonSerdeUtil {
         // with RexLiteral instead of RexNode.
         module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
         module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
-        module.addDeserializer(Duration.class, new DurationJsonDeserializer());
         module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
         module.addDeserializer(LogicalWindow.class, new LogicalWindowJsonDeserializer());
         module.addDeserializer(RexWindowBound.class, new RexWindowBoundJsonDeserializer());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
index af5cffb..d6d9151 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
@@ -37,6 +37,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 import java.io.IOException;
 import java.time.Duration;
 
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_ALIAS;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_INDEX;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_NAME;
@@ -83,7 +84,7 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
                 if (isTimeTumblingWindow) {
                     Duration size =
                             deserializationContext.readValue(
-                                    jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
+                                    traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
                                     Duration.class);
                     return new TumblingGroupWindow(
                             alias, timeField, new ValueLiteralExpression(size));
@@ -97,11 +98,11 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
                 if (isTimeSlidingWindow) {
                     Duration size =
                             deserializationContext.readValue(
-                                    jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
+                                    traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
                                     Duration.class);
                     Duration slide =
                             deserializationContext.readValue(
-                                    jsonNode.get(FIELD_NAME_SLIDE).traverse(jsonParser.getCodec()),
+                                    traverse(jsonNode.get(FIELD_NAME_SLIDE), jsonParser.getCodec()),
                                     Duration.class);
                     return new SlidingGroupWindow(
                             alias,
@@ -120,7 +121,7 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
             case KIND_SESSION:
                 Duration gap =
                         deserializationContext.readValue(
-                                jsonNode.get(FIELD_NAME_GAP).traverse(jsonParser.getCodec()),
+                                traverse(jsonNode.get(FIELD_NAME_GAP), jsonParser.getCodec()),
                                 Duration.class);
                 return new SessionGroupWindow(alias, timeField, new ValueLiteralExpression(gap));
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java
deleted file mode 100644
index b175a16..0000000
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/** Tests for {@link Duration} serialization and deserialization. */
-@RunWith(Parameterized.class)
-public class DurationJsonSerdeTest {
-
-    @Parameterized.Parameter public Duration duration;
-
-    @Test
-    public void testDurationSerde() throws IOException {
-        ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();
-
-        StringWriter writer = new StringWriter(100);
-        try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
-            gen.writeObject(duration);
-        }
-        String json = writer.toString();
-        Duration actual = mapper.readValue(json, Duration.class);
-        assertEquals(duration, actual);
-    }
-
-    @Parameterized.Parameters(name = "{0}")
-    public static List<Duration> testData() {
-        return Arrays.asList(
-                Duration.ofNanos(1234567890),
-                Duration.ofMillis(123456789),
-                Duration.ofSeconds(12345),
-                Duration.ofMinutes(123),
-                Duration.ofHours(5),
-                Duration.ofDays(10));
-    }
-}