You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/14 06:22:10 UTC

[incubator-inlong] branch master updated: [INLONG-3113][Sort] Fix date and time related bugs (#3113) (#3114)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e878ec  [INLONG-3113][Sort] Fix date and time related bugs (#3113) (#3114)
2e878ec is described below

commit 2e878ec086179b55275d083d48df80f7ede7ec48
Author: Kevin Wen <ke...@gmail.com>
AuthorDate: Mon Mar 14 14:22:04 2022 +0800

    [INLONG-3113][Sort] Fix date and time related bugs (#3113) (#3114)
---
 .../flink/hive/partition/RowPartitionComputer.java   |  3 +--
 ...CustomDateFormatDeserializationSchemaWrapper.java | 20 +++-----------------
 .../CustomDateFormatSerializationSchemaWrapper.java  | 11 +++--------
 3 files changed, 7 insertions(+), 27 deletions(-)

diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/RowPartitionComputer.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/RowPartitionComputer.java
index 332c392..841a9bc 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/RowPartitionComputer.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/partition/RowPartitionComputer.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.flink.hive.partition;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.io.Serializable;
-import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -131,7 +130,7 @@ public class RowPartitionComputer implements PartitionComputer<Row> {
 
         @Override
         public String transform(Object fieldValue) {
-            checkState(fieldValue instanceof Timestamp);
+            checkState(fieldValue instanceof Date);
             return dateFormat.format((Date) fieldValue);
         }
     }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
index c3cc687..a3f94e4 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CustomDateFormatDeserializationSchemaWrapper.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
-import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
@@ -29,13 +28,9 @@ import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 
 import java.io.IOException;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.text.ParseException;
 
 import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
 
 public class CustomDateFormatDeserializationSchemaWrapper implements DeserializationSchema<Row> {
@@ -100,22 +95,13 @@ public class CustomDateFormatDeserializationSchemaWrapper implements Deserializa
     private Object convert(Object input, FormatInfo formatInfo) throws ParseException {
 
         if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof String);
-            java.util.Date date =
-                    FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).parse((String) input);
-            return new Date(date.getTime());
+            return ((DateFormatInfo) formatInfo).deserialize((String) input);
 
         } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof String);
-            java.util.Date date =
-                    FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).parse((String) input);
-            return new Time(date.getTime());
+            return ((TimeFormatInfo) formatInfo).deserialize((String) input);
 
         } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof String);
-            java.util.Date date =
-                    FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).parse((String) input);
-            return new Timestamp(date.getTime());
+            return ((TimestampFormatInfo) formatInfo).deserialize((String) input);
 
         } else {
             return input;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
index f271e39..7b3070b 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CustomDateFormatSerializationSchemaWrapper.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.serialization;
 
-import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.types.Row;
@@ -31,7 +30,6 @@ import java.sql.Time;
 import java.sql.Timestamp;
 
 import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkState;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.isStandardTimestampFormat;
 
 public class CustomDateFormatSerializationSchemaWrapper implements SerializationSchema<Row> {
@@ -72,16 +70,13 @@ public class CustomDateFormatSerializationSchemaWrapper implements Serialization
     private Object convert(Object input, FormatInfo formatInfo) {
 
         if (formatInfo instanceof DateFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof Date);
-            return FastDateFormat.getInstance(((DateFormatInfo) formatInfo).getFormat()).format(input);
+            return ((DateFormatInfo) formatInfo).serialize((Date) input);
 
         } else if (formatInfo instanceof TimeFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof Time);
-            return FastDateFormat.getInstance(((TimeFormatInfo) formatInfo).getFormat()).format(input);
+            return ((TimeFormatInfo) formatInfo).serialize((Time) input);
 
         } else if (formatInfo instanceof TimestampFormatInfo && !isStandardTimestampFormat(formatInfo)) {
-            checkState(input instanceof Timestamp);
-            return FastDateFormat.getInstance(((TimestampFormatInfo) formatInfo).getFormat()).format(input);
+            return ((TimestampFormatInfo) formatInfo).serialize((Timestamp) input);
 
         } else {
             return input;