You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/03 12:19:54 UTC
[flink] branch master updated: [hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2529767 [hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de
2529767 is described below
commit 2529767ca6708e45a4068b07349c81b15c7bb398
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Jan 31 17:29:25 2022 +0100
[hotfix][table-planner] Use jdk 8 and java time modules for optional ser/de
Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
.../nodes/exec/serde/DurationJsonDeserializer.java | 45 ---------------
.../nodes/exec/serde/DurationJsonSerializer.java | 44 ---------------
.../plan/nodes/exec/serde/JsonSerdeUtil.java | 9 ++-
.../exec/serde/LogicalWindowJsonDeserializer.java | 9 +--
.../nodes/exec/serde/DurationJsonSerdeTest.java | 65 ----------------------
5 files changed, 11 insertions(+), 161 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java
deleted file mode 100644
index e327efd..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonDeserializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import java.io.IOException;
-import java.time.Duration;
-
-/**
- * JSON deserializer for {@link Duration}. refer to {@link DurationJsonSerializer} for serializer.
- */
-public class DurationJsonDeserializer extends StdDeserializer<Duration> {
- private static final long serialVersionUID = 1L;
-
- public DurationJsonDeserializer() {
- super(Duration.class);
- }
-
- @Override
- public Duration deserialize(
- JsonParser jsonParser, DeserializationContext deserializationContext)
- throws IOException {
- String duration = jsonParser.readValueAs(String.class);
- return Duration.parse(duration);
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java
deleted file mode 100644
index cb2d27e..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-import java.time.Duration;
-
-/**
- * JSON serializer for {@link Duration}. refer to {@link DurationJsonDeserializer} for deserializer.
- */
-public class DurationJsonSerializer extends StdSerializer<Duration> {
- private static final long serialVersionUID = 1L;
-
- public DurationJsonSerializer() {
- super(Duration.class);
- }
-
- @Override
- public void serialize(
- Duration duration, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
- throws IOException {
- jsonGenerator.writeString(duration.toString());
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 5e83e95..f82b52e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -45,11 +45,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
@@ -60,7 +63,6 @@ import org.apache.calcite.rex.RexWindowBound;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
-import java.time.Duration;
import java.util.Optional;
/** An utility class that provide abilities for JSON serialization and deserialization. */
@@ -93,6 +95,9 @@ public class JsonSerdeUtil {
.getTypeFactory()
.withClassLoader(JsonSerdeUtil.class.getClassLoader()));
OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+ OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
+ OBJECT_MAPPER_INSTANCE.registerModule(new Jdk8Module().configureAbsentsAsNulls(true));
+ OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
}
@@ -136,7 +141,6 @@ public class JsonSerdeUtil {
// RexNode is used in many exec nodes, so we register its serializer directly here
module.addSerializer(new RexNodeJsonSerializer());
module.addSerializer(new AggregateCallJsonSerializer());
- module.addSerializer(new DurationJsonSerializer());
module.addSerializer(new ChangelogModeJsonSerializer());
module.addSerializer(new LogicalWindowJsonSerializer());
module.addSerializer(new RexWindowBoundJsonSerializer());
@@ -164,7 +168,6 @@ public class JsonSerdeUtil {
// with RexLiteral instead of RexNode.
module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
- module.addDeserializer(Duration.class, new DurationJsonDeserializer());
module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
module.addDeserializer(LogicalWindow.class, new LogicalWindowJsonDeserializer());
module.addDeserializer(RexWindowBound.class, new RexWindowBoundJsonDeserializer());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
index af5cffb..d6d9151 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
@@ -37,6 +37,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
import java.io.IOException;
import java.time.Duration;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_ALIAS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_INDEX;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.FIELD_NAME_FIELD_NAME;
@@ -83,7 +84,7 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
if (isTimeTumblingWindow) {
Duration size =
deserializationContext.readValue(
- jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
+ traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
Duration.class);
return new TumblingGroupWindow(
alias, timeField, new ValueLiteralExpression(size));
@@ -97,11 +98,11 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
if (isTimeSlidingWindow) {
Duration size =
deserializationContext.readValue(
- jsonNode.get(FIELD_NAME_SIZE).traverse(jsonParser.getCodec()),
+ traverse(jsonNode.get(FIELD_NAME_SIZE), jsonParser.getCodec()),
Duration.class);
Duration slide =
deserializationContext.readValue(
- jsonNode.get(FIELD_NAME_SLIDE).traverse(jsonParser.getCodec()),
+ traverse(jsonNode.get(FIELD_NAME_SLIDE), jsonParser.getCodec()),
Duration.class);
return new SlidingGroupWindow(
alias,
@@ -120,7 +121,7 @@ public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow
case KIND_SESSION:
Duration gap =
deserializationContext.readValue(
- jsonNode.get(FIELD_NAME_GAP).traverse(jsonParser.getCodec()),
+ traverse(jsonNode.get(FIELD_NAME_GAP), jsonParser.getCodec()),
Duration.class);
return new SessionGroupWindow(alias, timeField, new ValueLiteralExpression(gap));
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java
deleted file mode 100644
index b175a16..0000000
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DurationJsonSerdeTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/** Tests for {@link Duration} serialization and deserialization. */
-@RunWith(Parameterized.class)
-public class DurationJsonSerdeTest {
-
- @Parameterized.Parameter public Duration duration;
-
- @Test
- public void testDurationSerde() throws IOException {
- ObjectMapper mapper = JsonSerdeUtil.getObjectMapper();
-
- StringWriter writer = new StringWriter(100);
- try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
- gen.writeObject(duration);
- }
- String json = writer.toString();
- Duration actual = mapper.readValue(json, Duration.class);
- assertEquals(duration, actual);
- }
-
- @Parameterized.Parameters(name = "{0}")
- public static List<Duration> testData() {
- return Arrays.asList(
- Duration.ofNanos(1234567890),
- Duration.ofMillis(123456789),
- Duration.ofSeconds(12345),
- Duration.ofMinutes(123),
- Duration.ofHours(5),
- Duration.ofDays(10));
- }
-}