You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/01/19 10:06:02 UTC

[drill] branch master updated: DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b1324b5  DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)
b1324b5 is described below

commit b1324b5db3a0591838df9ecf007caeca93b52421
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Wed Jan 19 12:05:52 2022 +0200

    DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)
    
    * Fix empty dict page handling in "old" Parquet reader.
    
    * Move empty_dict_page.parquet to a dir that doesn't break other tests.
    
    * Fix pageData null check after nextInternal loop in PageReader.
---
 .../parquet/columnreaders/AsyncPageReader.java     |  18 ++++++----
 .../store/parquet/columnreaders/PageReader.java    |  39 +++++++++++++++++----
 .../drill/exec/store/parquet/TestEmptyParquet.java |  30 ++++++++++++++++
 .../parquet/empty/empty_dict_page.parquet          | Bin 0 -> 3242 bytes
 4 files changed, 74 insertions(+), 13 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 2b4fe88..d882504 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -261,7 +261,7 @@ class AsyncPageReader extends PageReader {
       synchronized (pageQueueSyncronize) {
         boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
         readStatus = pageQueue.take(); // get the data if no exception has been thrown
-        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+        if (readStatus == ReadStatus.EMPTY) {
           throw new DrillRuntimeException("Unexpected end of data");
         }
         //if the queue was full before we took a page out, then there would
@@ -298,14 +298,20 @@ class AsyncPageReader extends PageReader {
       ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
 
-      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        loadDictionary(readStatus);
-        // callers expect us to have a data page after next(), so we start over
-        readStatus = nextPageFromQueue();
-        pageHeader = readStatus.getPageHeader();
+      if (pageHeader.uncompressed_page_size == 0) {
+        logger.info(
+          "skipping a {} of size {} because its uncompressed size is 0 bytes.",
+          pageHeader.getType(),
+          pageHeader.compressed_page_size
+        );
+        skip(pageHeader.compressed_page_size);
+        return;
       }
 
       switch (pageHeader.getType()) {
+        case DICTIONARY_PAGE:
+          loadDictionary(readStatus);
+          break;
         case DATA_PAGE:
           pageData = codecName == CompressionCodecName.UNCOMPRESSED
             ? readStatus.getPageData()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 11d581e..16c9840 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -425,13 +425,20 @@ class PageReader {
   protected void nextInternal() throws IOException {
     readPageHeader();
 
-    if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-      loadDictionary();
-      // callers expect us to have a data page after next(), so we start over
-      readPageHeader();
+    if (pageHeader.uncompressed_page_size == 0) {
+      logger.info(
+        "skipping a {} of size {} because its uncompressed size is 0 bytes.",
+        pageHeader.getType(),
+        pageHeader.compressed_page_size
+      );
+      skip(pageHeader.compressed_page_size);
+      return;
     }
 
     switch (pageHeader.getType()) {
+      case DICTIONARY_PAGE:
+        loadDictionary();
+        break;
       case DATA_PAGE:
         pageData = codecName == CompressionCodecName.UNCOMPRESSED
           ? readUncompressedPage()
@@ -445,6 +452,7 @@ class PageReader {
       default:
         logger.info("skipping a {} of size {}", pageHeader.getType(), pageHeader.compressed_page_size);
         skip(pageHeader.compressed_page_size);
+
     }
   }
 
@@ -533,11 +541,28 @@ class PageReader {
     }
 
     clearDataBufferAndReaders();
-    nextInternal();
+    do {
+      nextInternal();
+
+      if (pageHeader == null) {
+        throw new DrillRuntimeException(String.format(
+          "Failed to read another page having read %d of %d values from its " +
+          "column chunk.",
+          parentColumnReader.totalValuesRead,
+          totalValueCount
+        ));
+      }
+    } while (
+      // Continue until we hit a non-empty data page
+      pageHeader.uncompressed_page_size == 0
+      || (pageHeader.getType() != PageType.DATA_PAGE
+      && pageHeader.getType() != PageType.DATA_PAGE_V2)
+    );
 
-    if (pageData == null || pageHeader == null) {
+    if (pageData == null) {
       throw new DrillRuntimeException(String.format(
-        "Failed to read another page having read %d of %d values from its column chunk.",
+        "Failed to read another page having read %d of %d values from its " +
+        "column chunk.",
         parentColumnReader.totalValuesRead,
         totalValueCount
       ));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
index 683d40f..780aa6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
@@ -414,4 +414,34 @@ public class TestEmptyParquet extends ClusterTest {
     String plan = queryBuilder().sql(sql).explainText();
     assertTrue(plan.contains("usedMetadataFile=true"));
   }
+
+  /**
+   * Test a Parquet file containing a zero-byte dictionary page, c.f.
+   * DRILL-8023.
+   */
+  @Test
+  public void testEmptyDictPage() throws Exception {
+    try {
+      client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+      // Only column 'C' in empty_dict_page.parquet has an empty dictionary page.
+      String sql = "select A,B,C from dfs.`parquet/empty/empty_dict_page.parquet`";
+      RowSet actual = queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("A", TypeProtos.MinorType.BIGINT)
+        .addNullable("B", TypeProtos.MinorType.VARCHAR)
+        .addNullable("C", TypeProtos.MinorType.INT)
+        .buildSchema();
+
+      RowSet expected = client.rowSetBuilder(expectedSchema)
+        .addRow(1, "a", null)
+        .addRow(2, "b", null)
+        .addRow(3, "c", null)
+        .build();
+
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet b/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet
new file mode 100644
index 0000000..130e88d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet differ