You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/04/18 15:26:43 UTC
[drill] 15/15: DRILL-8421: Parquet microsecond columns (#2793)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch 1.21
in repository https://gitbox.apache.org/repos/asf/drill.git
commit fa22d67022b49b47d16e84bfc0bd26b97e58fb11
Author: Peter Franzen <pe...@myire.org>
AuthorDate: Tue Apr 18 09:43:45 2023 +0200
DRILL-8421: Parquet microsecond columns (#2793)
* Read parquet TIME_MICROS columns as 64-bit values before truncating to 32-bits
* Truncate parquet min and max metadata values for microsecond columns to milliseconds
* Express parquet TIME_MICROS metadata as Integer values
---
.../store/parquet/ParquetTableMetadataUtils.java | 3 +-
.../NullableFixedByteAlignedReaders.java | 2 +-
.../ParquetFixedWidthDictionaryReaders.java | 6 +-
.../parquet/metadata/FileMetadataCollector.java | 18 ++
.../exec/store/parquet/TestMicrosecondColumns.java | 355 +++++++++++++++++++++
.../test/resources/parquet/microseconds.parquet | Bin 0 -> 871 bytes
6 files changed, 379 insertions(+), 5 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index d1505d1120..a25b1fb5c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -344,10 +344,11 @@ public class ParquetTableMetadataUtils {
case INT64:
if (originalType == OriginalType.DECIMAL) {
return BigInteger.valueOf(getLong(value));
+ } else if (originalType == OriginalType.TIME_MICROS) {
+ return getInt(value);
} else {
return getLong(value);
}
-
case FLOAT:
return getFloat(value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 9b82620c3b..b678bb8b61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -193,7 +193,7 @@ public class NullableFixedByteAlignedReaders {
protected void readField(long recordsToReadInThisPass) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsToReadInThisPass; i++) {
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index ea13a4c42d..2a916ba007 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -168,13 +168,13 @@ public class ParquetFixedWidthDictionaryReaders {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
}
} else {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
- int value = pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes);
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000);
+ long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000));
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
index c79996bc6a..9950171f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
@@ -208,6 +208,12 @@ public class FileMetadataCollector {
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
}
+ if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
+ // DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the
+ // initial scanning of files when filtering will compare to the wrong values.
+ minValue = truncateMicros(minValue);
+ maxValue = truncateMicros(maxValue);
+ }
}
long numNulls = stats.getNumNulls();
Metadata_V4.ColumnMetadata_v4 columnMetadata = new Metadata_V4.ColumnMetadata_v4(columnTypeMetadata.name,
@@ -218,6 +224,18 @@ public class FileMetadataCollector {
columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
}
+ private static boolean isMicrosecondColumnType(OriginalType columnType) {
+ return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS;
+ }
+
+ private static Object truncateMicros(Object microSeconds) {
+ if (microSeconds instanceof Number) {
+ return Long.valueOf(((Number) microSeconds).longValue() / 1000);
+ } else {
+ return microSeconds;
+ }
+ }
+
/**
* Get the host affinity for a row group.
*
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
new file mode 100644
index 0000000000..01832b446e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.drill.categories.ParquetTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category({ParquetTest.class, UnlikelyTest.class})
+public class TestMicrosecondColumns extends ClusterTest {
+
+ private static final String TIME_FORMAT = "HH:mm:ss.SSS";
+ private static final String TO_TIME_TEMPLATE = "TO_TIME('%s', 'HH:mm:ss.SSS')";
+ private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(TIME_FORMAT);
+ private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+ private static final String TO_TIMESTAMP_TEMPLATE = "TO_TIMESTAMP('%s', 'yyy-MM-dd''T''HH:mm:ss.SSS')";
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT);
+
+ // The parquet file used in the test cases, can be generated by calling createParquetTestFile().
+ private static final String DATAFILE = "cp.`parquet/microseconds.parquet`";
+
+ // Schema used to generate the parquet test file.
+ private static final String SCHEMA =
+ "message ParquetMicrosecondDataTypes { \n" +
+ " required int32 rowKey; \n" +
+ " required int64 _TIME_MICROS_int64 ( TIME_MICROS ) ; \n" +
+ " required int64 _TIMESTAMP_MICROS_int64 ( TIMESTAMP_MICROS ) ; \n" +
+ "} \n";
+
+ // Test values for the _TIME_MICROS_int64 field. Will be written to the test parquet file when
+ // calling createParquetTestFile().
+ private static final long[] TIME_MICROS_VALUES = {
+ toMicrosecondTime(0, 32, 58, 174711),
+ toMicrosecondTime(9, 0, 22, 654321),
+ toMicrosecondTime(22, 12, 41, 123456)
+ };
+
+ // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the test parquet file
+ // when calling createParquetTestFile().
+ private static final long[] TIMESTAMP_MICROS_VALUES = {
+ toMicrosecondTimestamp(2021, 8, 1, 22, 12, 41, 123456),
+ toMicrosecondTimestamp(2022, 5, 6, 9, 0, 22, 654321),
+ toMicrosecondTimestamp(2023, 2, 10, 0, 32, 58, 174711)
+ };
+
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ }
+
+
+ @Test
+ public void testSelectTimeColumns() throws Exception {
+ // DRILL-8423
+ String query = "select _TIME_MICROS_int64 as t from %s";
+ testBuilder()
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(toLocalTime(TIME_MICROS_VALUES[0]))
+ .baselineValues(toLocalTime(TIME_MICROS_VALUES[1]))
+ .baselineValues(toLocalTime(TIME_MICROS_VALUES[2]))
+ .go();
+ }
+
+
+ @Test
+ public void testLessThanSmallestTime() throws Exception {
+ // No time values should be less than the smallest value in the parquet file
+ int expectedCount = 0;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]);
+ executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testLessThanMidTime() throws Exception {
+ // The smallest time value should be less than the middle value in the parquet file
+ int expectedCount = 1;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]);
+ executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testLessThanLargestTime() throws Exception {
+ // The smallest and middle time values should be less than the largest value in the parquet file
+ int expectedCount = 2;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]);
+ executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanSmallestTime() throws Exception {
+ // The middle and largest time values should be greater than the smallest value in the parquet
+ // file
+ int expectedCount = 2;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]);
+ executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanMidTime() throws Exception {
+ // The largest time value should be greater than the middle value in the parquet file
+ int expectedCount = 1;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]);
+ executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanLargestTime() throws Exception {
+ // No time value should be greater than the largest value in the parquet file
+ int expectedCount = 0;
+ String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]);
+ executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testTimeRange() throws Exception {
+ // The middle time test value should be greater than the smallest value and less than the
+ // largest in the parquet file
+ int expectedCount = 1;
+ String lower = createToTimeFragment(TIME_MICROS_VALUES[0]);
+ String upper = createToTimeFragment(TIME_MICROS_VALUES[2]);
+ executeFilterQuery("_TIME_MICROS_int64 > " + lower + " and _TIME_MICROS_int64 < " + upper, expectedCount);
+ }
+
+
+ @Test
+ public void testSelectTimestampColumns() throws Exception {
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
+ testBuilder()
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[0]))
+ .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[1]))
+ .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[2]))
+ .go();
+ }
+
+
+ @Test
+ public void testLessThanSmallestTimestamp() throws Exception {
+ // No timestamp values should be less than the smallest value in the parquet file
+ int expectedCount = 0;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testLessThanMidTimestamp() throws Exception {
+ // The smallest timestamp value should be less than the middle value in the parquet file
+ int expectedCount = 1;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testLessThanLargestTimestamp() throws Exception {
+ // The smallest and middle timestamp values should be less than the largest value in the parquet
+ // file
+ int expectedCount = 2;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testLessThanTimestampLongIntoTheFuture() throws Exception {
+ // All test timestamps should be less than a timestamp several hundred years into the future
+ // See https://issues.apache.org/jira/browse/DRILL-8421
+ int expectedCount = 3;
+ String whereClause = "_TIMESTAMP_MICROS_int64 < TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')";
+ executeFilterQuery(whereClause, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanSmallestTimestamp() throws Exception {
+ // The middle and largest timestamp values should be greater than the smallest value in the
+ // parquet file
+ int expectedCount = 2;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanMidTimestamp() throws Exception {
+ // The largest timestamp value should be greater than the middle value in the parquet file
+ int expectedCount = 1;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanLargestTimestamp() throws Exception {
+ // No timestamp values should be greater than the largest value in the parquet file
+ int expectedCount = 0;
+ String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+ }
+
+
+ @Test
+ public void testGreaterThanTimestampLongIntoTheFuture() throws Exception {
+ // No test timestamps should be greater than a timestamp several hundred years into the future
+ // See https://issues.apache.org/jira/browse/DRILL-8421
+ int expectedCount = 0;
+ String whereClause = "_TIMESTAMP_MICROS_int64 > TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')";
+ executeFilterQuery(whereClause, expectedCount);
+ }
+
+
+ @Test
+ public void testTimestampRange() throws Exception {
+ // The middle timestamp test value should be greater than the smallest value and less than the
+ // largest in the parquet file
+ String lower = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+ String upper = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+ executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + lower + " and _TIMESTAMP_MICROS_int64 < " + upper, 1);
+ }
+
+
+ private void executeFilterQuery(String whereClause, long expectedCount) throws Exception {
+ String query = "select count(*) as c from %s where " + whereClause;
+ testBuilder()
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(expectedCount)
+ .go();
+ }
+
+
+ public static void createParquetTestFile(String filePath) throws IOException {
+
+ MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA);
+ GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf);
+ SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
+
+ try (ParquetWriter<Group> writer = createParquetWriter(filePath)) {
+ for (int i=0; i<TIME_MICROS_VALUES.length; i++) {
+ writer.write(
+ groupFactory.newGroup()
+ .append("rowKey", i+1)
+ .append("_TIME_MICROS_int64", TIME_MICROS_VALUES[i])
+ .append("_TIMESTAMP_MICROS_int64", TIMESTAMP_MICROS_VALUES[i])
+ );
+ }
+ }
+ }
+
+
+ private static ParquetWriter<Group> createParquetWriter(String filePath) throws IOException {
+ return
+ ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath))
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withCompressionCodec(CompressionCodecName.GZIP)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ .withConf(ParquetSimpleTestFileGenerator.conf)
+ .build();
+ }
+
+
+ private static String createToTimeFragment(long micros) {
+ return String.format(TO_TIME_TEMPLATE, TIME_FORMATTER.format(toLocalTime(micros)));
+ }
+
+
+ private static String createToTimestampFragment(long micros) {
+ return String.format(TO_TIMESTAMP_TEMPLATE, TIMESTAMP_FORMATTER.format(toLocalDateTime(micros)));
+ }
+
+
+ private static LocalTime toLocalTime(long micros) {
+ return LocalTime.ofNanoOfDay((micros/1000L) * 1000_000L);
+ }
+
+
+ private static LocalDateTime toLocalDateTime(long micros) {
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L), ZoneOffset.ofHours(0));
+ }
+
+
+ private static long toMicrosecondTime(int hour, int minute, int second, int microOfSecond) {
+ return LocalTime.of(hour, minute, second, microOfSecond*1000).toNanoOfDay() / 1000L;
+ }
+
+
+ private static long toMicrosecondTimestamp(
+ int year,
+ int month,
+ int dayOfMonth,
+ int hour,
+ int minute,
+ int second,
+ int microOfSecond) {
+
+ Instant instant =
+ LocalDateTime
+ .of(year, month, dayOfMonth, hour, minute, second, microOfSecond*1000)
+ .toInstant(ZoneOffset.ofHours(0));
+
+ return instant.getEpochSecond() * 1000_000L + instant.getNano() / 1000L;
+ }
+}
diff --git a/exec/java-exec/src/test/resources/parquet/microseconds.parquet b/exec/java-exec/src/test/resources/parquet/microseconds.parquet
new file mode 100644
index 0000000000..7bac7aa26d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/microseconds.parquet differ