You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/31 02:58:02 UTC

[incubator-inlong] branch master updated: [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3697916  [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format
3697916 is described below

commit 3697916d8fc52f147c5a0667e2bbe0d0c6912417
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Thu Mar 31 10:57:55 2022 +0800

    [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format
---
 .../hive/formats/parquet/ParquetRowWriter.java     | 41 +++++++++++++++++++---
 .../formats/parquet/ParquetSchemaConverter.java    |  3 +-
 .../formats/parquet/ParquetBulkWriterTest.java     |  6 ++--
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
index b36ada8..08c4da2 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
@@ -18,10 +18,13 @@
 
 package org.apache.inlong.sort.flink.hive.formats.parquet;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Date;
+import java.sql.Time;
 import java.sql.Timestamp;
-import java.util.Date;
 import java.util.Map;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
@@ -35,6 +38,10 @@ import org.apache.parquet.schema.Type;
 /** Writes a record to the Parquet API with the expected schema in order to be written to a file. */
 public class ParquetRowWriter {
 
+    public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
+    public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
+    public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+
     static final String ARRAY_FIELD_NAME = "list";
 
     static final String MAP_ENTITY_FIELD_NAME = "key_value";
@@ -106,10 +113,12 @@ public class ParquetRowWriter {
                 case DOUBLE:
                     return new DoubleWriter();
                 case DATE:
+                    return (row, ordinal) -> recordConsumer.addInteger(
+                            (int) ((Date) row.getField(ordinal)).toLocalDate().toEpochDay());
                 case TIME_WITHOUT_TIME_ZONE:
-                    return (row, ordinal) -> recordConsumer.addInteger((int) ((Date) row.getField(ordinal)).getTime());
+                    return (row, ordinal) -> recordConsumer.addInteger((int) ((Time) row.getField(ordinal)).getTime());
                 case TIMESTAMP_WITHOUT_TIME_ZONE:
-                    return (row, ordinal) -> recordConsumer.addLong(((Timestamp) row.getField(ordinal)).getTime());
+                    return new TimestampWriter();
                 default:
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
@@ -203,6 +212,30 @@ public class ParquetRowWriter {
         }
     }
 
+    private class TimestampWriter implements FieldWriter {
+
+        @Override
+        public void write(Row row, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96((Timestamp) row.getField(ordinal)));
+        }
+    }
+
+    public static Binary timestampToInt96(Timestamp timestamp) {
+        int julianDay;
+        long nanosOfDay;
+
+        long mills = timestamp.getTime();
+        julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
+        nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * NANOS_PER_SECOND + timestamp.getNanos();
+
+        ByteBuffer buf = ByteBuffer.allocate(12);
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+        buf.putLong(nanosOfDay);
+        buf.putInt(julianDay);
+        buf.flip();
+        return Binary.fromConstantByteBuffer(buf);
+    }
+
     private class ArrayWriter implements FieldWriter {
 
         private final LogicalType elementTypeFlink;
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
index 4b7eaf5..54866fc 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
@@ -601,8 +601,7 @@ public class ParquetSchemaConverter {
                         .as(OriginalType.TIME_MILLIS)
                         .named(name);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
-                        .as(OriginalType.TIMESTAMP_MILLIS)
+                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
                         .named(name);
             case ARRAY:
                 return Types.list(repetition)
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
index 8f82cb3..311e7f8 100644
--- a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
@@ -94,7 +94,7 @@ public class ParquetBulkWriterTest {
                 (float) 1.1,
                 1.11,
                 new BigDecimal("123456789123456789"),
-                new Date(0),
+                new Date(24 * 60 * 60 * 1000),
                 new Time(0),
                 new Timestamp(0),
                 testArray,
@@ -119,9 +119,9 @@ public class ParquetBulkWriterTest {
         assertEquals(1.1, line.getFloat(6, 0), 0.01);
         assertEquals(1.11, line.getDouble(7, 0), 0.001);
         assertEquals("123456789123456789", line.getString(8, 0));
-        assertEquals(0, line.getInteger(9, 0));
+        assertEquals(1, line.getInteger(9, 0));
         assertEquals(0, line.getInteger(10, 0));
-        assertEquals(0, line.getLong(11, 0));
+        assertEquals(ParquetRowWriter.timestampToInt96(new Timestamp(0)), line.getInt96(11, 0));
 
         Group f13 = line.getGroup("f13", 0);
         assertEquals(1, f13.getGroup(ARRAY_FIELD_NAME, 0).getInteger(0, 0));