You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/04/18 15:26:43 UTC

[drill] 15/15: DRILL-8421: Parquet microsecond columns (#2793)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.21
in repository https://gitbox.apache.org/repos/asf/drill.git

commit fa22d67022b49b47d16e84bfc0bd26b97e58fb11
Author: Peter Franzen <pe...@myire.org>
AuthorDate: Tue Apr 18 09:43:45 2023 +0200

    DRILL-8421: Parquet microsecond columns (#2793)
    
    * Read parquet TIME_MICROS columns as 64-bit values before truncating to 32-bits
    * Truncate parquet min and max metadata values for microsecond columns to milliseconds
    * Express parquet TIME_MICROS metadata as Integer values
---
 .../store/parquet/ParquetTableMetadataUtils.java   |   3 +-
 .../NullableFixedByteAlignedReaders.java           |   2 +-
 .../ParquetFixedWidthDictionaryReaders.java        |   6 +-
 .../parquet/metadata/FileMetadataCollector.java    |  18 ++
 .../exec/store/parquet/TestMicrosecondColumns.java | 355 +++++++++++++++++++++
 .../test/resources/parquet/microseconds.parquet    | Bin 0 -> 871 bytes
 6 files changed, 379 insertions(+), 5 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index d1505d1120..a25b1fb5c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -344,10 +344,11 @@ public class ParquetTableMetadataUtils {
         case INT64:
           if (originalType == OriginalType.DECIMAL) {
             return BigInteger.valueOf(getLong(value));
+          } else if (originalType == OriginalType.TIME_MICROS) {
+            return getInt(value);
           } else {
             return getLong(value);
           }
-
         case FLOAT:
           return getFloat(value);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 9b82620c3b..b678bb8b61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -193,7 +193,7 @@ public class NullableFixedByteAlignedReaders {
     protected void readField(long recordsToReadInThisPass) {
       ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
       for (int i = 0; i < recordsToReadInThisPass; i++) {
-        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
+        valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index ea13a4c42d..2a916ba007 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -168,13 +168,13 @@ public class ParquetFixedWidthDictionaryReaders {
       if (recordsRequireDecoding()) {
         ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
         for (int i = 0; i < recordsReadInThisIteration; i++) {
-          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
         }
       } else {
         int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
         for (int i = 0; i < recordsReadInThisIteration; i++) {
-          int value = pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes);
-          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000);
+          long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000));
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
index c79996bc6a..9950171f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
@@ -208,6 +208,12 @@ public class FileMetadataCollector {
           minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
           maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
         }
+        if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
+          // DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the
+          // initial scanning of files when filtering will compare to the wrong values.
+          minValue = truncateMicros(minValue);
+          maxValue = truncateMicros(maxValue);
+        }
       }
       long numNulls = stats.getNumNulls();
       Metadata_V4.ColumnMetadata_v4 columnMetadata = new Metadata_V4.ColumnMetadata_v4(columnTypeMetadata.name,
@@ -218,6 +224,18 @@ public class FileMetadataCollector {
     columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
   }
 
+  private static boolean isMicrosecondColumnType(OriginalType columnType) {
+    return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS;
+  }
+
+  private static Object truncateMicros(Object microSeconds) {
+    if (microSeconds instanceof Number) {
+      return Long.valueOf(((Number) microSeconds).longValue() / 1000);
+    } else {
+      return microSeconds;
+    }
+  }
+
   /**
    * Get the host affinity for a row group.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
new file mode 100644
index 0000000000..01832b446e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.apache.drill.categories.ParquetTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category({ParquetTest.class, UnlikelyTest.class})
+public class TestMicrosecondColumns extends ClusterTest {
+
+  private static final String TIME_FORMAT = "HH:mm:ss.SSS";
+  private static final String TO_TIME_TEMPLATE = "TO_TIME('%s', 'HH:mm:ss.SSS')";
+  private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(TIME_FORMAT);
+  private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+  private static final String TO_TIMESTAMP_TEMPLATE = "TO_TIMESTAMP('%s', 'yyy-MM-dd''T''HH:mm:ss.SSS')";
+  private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT);
+
+  // The parquet file used in the test cases, can be generated by calling createParquetTestFile().
+  private static final String DATAFILE = "cp.`parquet/microseconds.parquet`";
+
+  // Schema used to generate the parquet test file.
+  private static final String SCHEMA =
+      "message ParquetMicrosecondDataTypes { \n" +
+          "  required int32 rowKey; \n" +
+          "  required int64 _TIME_MICROS_int64  ( TIME_MICROS ) ; \n" +
+          "  required int64 _TIMESTAMP_MICROS_int64  ( TIMESTAMP_MICROS ) ; \n" +
+          "} \n";
+
+  // Test values for the _TIME_MICROS_int64 field. Will be written to the test parquet file when
+  // calling createParquetTestFile().
+  private static final long[] TIME_MICROS_VALUES = {
+      toMicrosecondTime(0, 32, 58, 174711),
+      toMicrosecondTime(9, 0, 22, 654321),
+      toMicrosecondTime(22, 12, 41, 123456)
+  };
+
+  // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the test parquet file
+  // when calling createParquetTestFile().
+  private static final long[] TIMESTAMP_MICROS_VALUES = {
+      toMicrosecondTimestamp(2021, 8, 1, 22, 12, 41, 123456),
+      toMicrosecondTimestamp(2022, 5, 6, 9, 0, 22, 654321),
+      toMicrosecondTimestamp(2023, 2, 10, 0, 32, 58, 174711)
+  };
+
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
+
+  @Test
+  public void testSelectTimeColumns() throws Exception {
+    // DRILL-8423
+    String query = "select _TIME_MICROS_int64 as t from %s";
+    testBuilder()
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(toLocalTime(TIME_MICROS_VALUES[0]))
+        .baselineValues(toLocalTime(TIME_MICROS_VALUES[1]))
+        .baselineValues(toLocalTime(TIME_MICROS_VALUES[2]))
+        .go();
+  }
+
+
+  @Test
+  public void testLessThanSmallestTime() throws Exception {
+    // No time values should be less than the smallest value in the parquet file
+    int expectedCount = 0;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]);
+    executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testLessThanMidTime() throws Exception {
+    // The smallest time value should be less than the middle value in the parquet file
+    int expectedCount = 1;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]);
+    executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testLessThanLargestTime() throws Exception {
+    // The smallest and middle time values should be less than the largest value in the parquet file
+    int expectedCount = 2;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]);
+    executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanSmallestTime() throws Exception {
+    // The middle and largest time values should be greater than the smallest value in the parquet
+    // file
+    int expectedCount = 2;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]);
+    executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanMidTime() throws Exception {
+    // The largest time value should be greater than the middle value in the parquet file
+    int expectedCount = 1;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]);
+    executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanLargestTime() throws Exception {
+    // No time value should be greater than the largest value in the parquet file
+    int expectedCount = 0;
+    String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]);
+    executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testTimeRange() throws Exception {
+    // The middle time test value should be greater than the smallest value and less than the
+    // largest in the parquet file
+    int expectedCount = 1;
+    String lower = createToTimeFragment(TIME_MICROS_VALUES[0]);
+    String upper = createToTimeFragment(TIME_MICROS_VALUES[2]);
+    executeFilterQuery("_TIME_MICROS_int64 > " + lower + " and _TIME_MICROS_int64 < " + upper, expectedCount);
+  }
+
+
+  @Test
+  public void testSelectTimestampColumns() throws Exception {
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
+    testBuilder()
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[0]))
+        .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[1]))
+        .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[2]))
+        .go();
+  }
+
+
+  @Test
+  public void testLessThanSmallestTimestamp() throws Exception {
+    // No timestamp values should be less than the smallest value in the parquet file
+    int expectedCount = 0;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testLessThanMidTimestamp() throws Exception {
+    // The smallest timestamp value should be less than the middle value in the parquet file
+    int expectedCount = 1;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testLessThanLargestTimestamp() throws Exception {
+    // The smallest and middle timestamp values should be less than the largest value in the parquet
+    // file
+    int expectedCount = 2;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testLessThanTimestampLongIntoTheFuture() throws Exception {
+    // All test timestamps should be less than a timestamp several hundred years into the future
+    // See https://issues.apache.org/jira/browse/DRILL-8421
+    int expectedCount = 3;
+    String whereClause = "_TIMESTAMP_MICROS_int64 < TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')";
+    executeFilterQuery(whereClause, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanSmallestTimestamp() throws Exception {
+    // The middle and largest timestamp values should be greater than the smallest value in the
+    // parquet file
+    int expectedCount = 2;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanMidTimestamp() throws Exception {
+    // The largest timestamp value should be greater than the middle value in the parquet file
+    int expectedCount = 1;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanLargestTimestamp() throws Exception {
+    // No timestamp values should be greater than the largest value in the parquet file
+    int expectedCount = 0;
+    String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount);
+  }
+
+
+  @Test
+  public void testGreaterThanTimestampLongIntoTheFuture() throws Exception {
+    // No test timestamps should be greater than a timestamp several hundred years into the future
+    // See https://issues.apache.org/jira/browse/DRILL-8421
+    int expectedCount = 0;
+    String whereClause = "_TIMESTAMP_MICROS_int64 > TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')";
+    executeFilterQuery(whereClause, expectedCount);
+  }
+
+
+  @Test
+  public void testTimestampRange() throws Exception {
+    // The middle timestamp test value should be greater than the smallest value and less than the
+    // largest in the parquet file
+    String lower = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]);
+    String upper = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]);
+    executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + lower + " and _TIMESTAMP_MICROS_int64 < " + upper, 1);
+  }
+
+
+  private void executeFilterQuery(String whereClause, long expectedCount) throws Exception {
+    String query = "select count(*) as c from %s where " + whereClause;
+    testBuilder()
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(expectedCount)
+        .go();
+  }
+
+
+  public static void createParquetTestFile(String filePath) throws IOException {
+
+    MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA);
+    GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf);
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
+
+    try (ParquetWriter<Group> writer = createParquetWriter(filePath)) {
+      for (int i=0; i<TIME_MICROS_VALUES.length; i++) {
+        writer.write(
+            groupFactory.newGroup()
+                .append("rowKey", i+1)
+                .append("_TIME_MICROS_int64", TIME_MICROS_VALUES[i])
+                .append("_TIMESTAMP_MICROS_int64", TIMESTAMP_MICROS_VALUES[i])
+        );
+      }
+    }
+  }
+
+
+  private static ParquetWriter<Group> createParquetWriter(String filePath) throws IOException {
+    return
+        ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath))
+            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+            .withCompressionCodec(CompressionCodecName.GZIP)
+            .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+            .withConf(ParquetSimpleTestFileGenerator.conf)
+            .build();
+  }
+
+
+  private static String createToTimeFragment(long micros) {
+    return String.format(TO_TIME_TEMPLATE, TIME_FORMATTER.format(toLocalTime(micros)));
+  }
+
+
+  private static String createToTimestampFragment(long micros) {
+    return String.format(TO_TIMESTAMP_TEMPLATE, TIMESTAMP_FORMATTER.format(toLocalDateTime(micros)));
+  }
+
+
+  private static LocalTime toLocalTime(long micros) {
+    return LocalTime.ofNanoOfDay((micros/1000L) * 1000_000L);
+  }
+
+
+  private static LocalDateTime toLocalDateTime(long micros) {
+    return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L), ZoneOffset.ofHours(0));
+  }
+
+
+  private static long toMicrosecondTime(int hour, int minute, int second, int microOfSecond) {
+    return LocalTime.of(hour, minute, second, microOfSecond*1000).toNanoOfDay() / 1000L;
+  }
+
+
+  private static long toMicrosecondTimestamp(
+      int year,
+      int month,
+      int dayOfMonth,
+      int hour,
+      int minute,
+      int second,
+      int microOfSecond) {
+
+    Instant instant =
+        LocalDateTime
+            .of(year, month, dayOfMonth, hour, minute, second, microOfSecond*1000)
+            .toInstant(ZoneOffset.ofHours(0));
+
+    return instant.getEpochSecond() * 1000_000L + instant.getNano() / 1000L;
+  }
+}
diff --git a/exec/java-exec/src/test/resources/parquet/microseconds.parquet b/exec/java-exec/src/test/resources/parquet/microseconds.parquet
new file mode 100644
index 0000000000..7bac7aa26d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/microseconds.parquet differ