You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/03/31 02:58:02 UTC
[incubator-inlong] branch master updated: [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3697916 [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format
3697916 is described below
commit 3697916d8fc52f147c5a0667e2bbe0d0c6912417
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Thu Mar 31 10:57:55 2022 +0800
[INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when using parquet format
---
.../hive/formats/parquet/ParquetRowWriter.java | 41 +++++++++++++++++++---
.../formats/parquet/ParquetSchemaConverter.java | 3 +-
.../formats/parquet/ParquetBulkWriterTest.java | 6 ++--
3 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
index b36ada8..08c4da2 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
@@ -18,10 +18,13 @@
package org.apache.inlong.sort.flink.hive.formats.parquet;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Date;
+import java.sql.Time;
import java.sql.Timestamp;
-import java.util.Date;
import java.util.Map;
-
+import java.util.concurrent.TimeUnit;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
@@ -35,6 +38,10 @@ import org.apache.parquet.schema.Type;
/** Writes a record to the Parquet API with the expected schema in order to be written to a file. */
public class ParquetRowWriter {
+ public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
+ public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
+ public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+
static final String ARRAY_FIELD_NAME = "list";
static final String MAP_ENTITY_FIELD_NAME = "key_value";
@@ -106,10 +113,12 @@ public class ParquetRowWriter {
case DOUBLE:
return new DoubleWriter();
case DATE:
+ return (row, ordinal) -> recordConsumer.addInteger(
+ (int) ((Date) row.getField(ordinal)).toLocalDate().toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
- return (row, ordinal) -> recordConsumer.addInteger((int) ((Date) row.getField(ordinal)).getTime());
+ return (row, ordinal) -> recordConsumer.addInteger((int) ((Time) row.getField(ordinal)).getTime());
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return (row, ordinal) -> recordConsumer.addLong(((Timestamp) row.getField(ordinal)).getTime());
+ return new TimestampWriter();
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
@@ -203,6 +212,30 @@ public class ParquetRowWriter {
}
}
+ private class TimestampWriter implements FieldWriter {
+
+ @Override
+ public void write(Row row, int ordinal) {
+ recordConsumer.addBinary(timestampToInt96((Timestamp) row.getField(ordinal)));
+ }
+ }
+
+ public static Binary timestampToInt96(Timestamp timestamp) {
+ int julianDay;
+ long nanosOfDay;
+
+ long mills = timestamp.getTime();
+ julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
+ nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * NANOS_PER_SECOND + timestamp.getNanos();
+
+ ByteBuffer buf = ByteBuffer.allocate(12);
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ buf.putLong(nanosOfDay);
+ buf.putInt(julianDay);
+ buf.flip();
+ return Binary.fromConstantByteBuffer(buf);
+ }
+
private class ArrayWriter implements FieldWriter {
private final LogicalType elementTypeFlink;
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
index 4b7eaf5..54866fc 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
@@ -601,8 +601,7 @@ public class ParquetSchemaConverter {
.as(OriginalType.TIME_MILLIS)
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
- .as(OriginalType.TIMESTAMP_MILLIS)
+ return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
case ARRAY:
return Types.list(repetition)
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
index 8f82cb3..311e7f8 100644
--- a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
@@ -94,7 +94,7 @@ public class ParquetBulkWriterTest {
(float) 1.1,
1.11,
new BigDecimal("123456789123456789"),
- new Date(0),
+ new Date(24 * 60 * 60 * 1000),
new Time(0),
new Timestamp(0),
testArray,
@@ -119,9 +119,9 @@ public class ParquetBulkWriterTest {
assertEquals(1.1, line.getFloat(6, 0), 0.01);
assertEquals(1.11, line.getDouble(7, 0), 0.001);
assertEquals("123456789123456789", line.getString(8, 0));
- assertEquals(0, line.getInteger(9, 0));
+ assertEquals(1, line.getInteger(9, 0));
assertEquals(0, line.getInteger(10, 0));
- assertEquals(0, line.getLong(11, 0));
+ assertEquals(ParquetRowWriter.timestampToInt96(new Timestamp(0)), line.getInt96(11, 0));
Group f13 = line.getGroup("f13", 0);
assertEquals(1, f13.getGroup(ARRAY_FIELD_NAME, 0).getInteger(0, 0));