You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/01/19 10:06:02 UTC
[drill] branch master updated: DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b1324b5 DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)
b1324b5 is described below
commit b1324b5db3a0591838df9ecf007caeca93b52421
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Wed Jan 19 12:05:52 2022 +0200
DRILL-8023: Empty dict page breaks the "old" Parquet reader (#2430)
* Fix empty dict page handling in "old" Parquet reader.
* Move empty_dict_page.parquet to a dir that doesn't break other tests.
* Fix pageData null check after nextInternal loop in PageReader.
---
.../parquet/columnreaders/AsyncPageReader.java | 18 ++++++----
.../store/parquet/columnreaders/PageReader.java | 39 +++++++++++++++++----
.../drill/exec/store/parquet/TestEmptyParquet.java | 30 ++++++++++++++++
.../parquet/empty/empty_dict_page.parquet | Bin 0 -> 3242 bytes
4 files changed, 74 insertions(+), 13 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 2b4fe88..d882504 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -261,7 +261,7 @@ class AsyncPageReader extends PageReader {
synchronized (pageQueueSyncronize) {
boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
readStatus = pageQueue.take(); // get the data if no exception has been thrown
- if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+ if (readStatus == ReadStatus.EMPTY) {
throw new DrillRuntimeException("Unexpected end of data");
}
//if the queue was full before we took a page out, then there would
@@ -298,14 +298,20 @@ class AsyncPageReader extends PageReader {
ReadStatus readStatus = nextPageFromQueue();
pageHeader = readStatus.getPageHeader();
- if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- loadDictionary(readStatus);
- // callers expect us to have a data page after next(), so we start over
- readStatus = nextPageFromQueue();
- pageHeader = readStatus.getPageHeader();
+ if (pageHeader.uncompressed_page_size == 0) {
+ logger.info(
+ "skipping a {} of size {} because its uncompressed size is 0 bytes.",
+ pageHeader.getType(),
+ pageHeader.compressed_page_size
+ );
+ skip(pageHeader.compressed_page_size);
+ return;
}
switch (pageHeader.getType()) {
+ case DICTIONARY_PAGE:
+ loadDictionary(readStatus);
+ break;
case DATA_PAGE:
pageData = codecName == CompressionCodecName.UNCOMPRESSED
? readStatus.getPageData()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 11d581e..16c9840 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -425,13 +425,20 @@ class PageReader {
protected void nextInternal() throws IOException {
readPageHeader();
- if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- loadDictionary();
- // callers expect us to have a data page after next(), so we start over
- readPageHeader();
+ if (pageHeader.uncompressed_page_size == 0) {
+ logger.info(
+ "skipping a {} of size {} because its uncompressed size is 0 bytes.",
+ pageHeader.getType(),
+ pageHeader.compressed_page_size
+ );
+ skip(pageHeader.compressed_page_size);
+ return;
}
switch (pageHeader.getType()) {
+ case DICTIONARY_PAGE:
+ loadDictionary();
+ break;
case DATA_PAGE:
pageData = codecName == CompressionCodecName.UNCOMPRESSED
? readUncompressedPage()
@@ -445,6 +452,7 @@ class PageReader {
default:
logger.info("skipping a {} of size {}", pageHeader.getType(), pageHeader.compressed_page_size);
skip(pageHeader.compressed_page_size);
+
}
}
@@ -533,11 +541,28 @@ class PageReader {
}
clearDataBufferAndReaders();
- nextInternal();
+ do {
+ nextInternal();
+
+ if (pageHeader == null) {
+ throw new DrillRuntimeException(String.format(
+ "Failed to read another page having read %d of %d values from its " +
+ "column chunk.",
+ parentColumnReader.totalValuesRead,
+ totalValueCount
+ ));
+ }
+ } while (
+ // Continue until we hit a non-empty data page
+ pageHeader.uncompressed_page_size == 0
+ || (pageHeader.getType() != PageType.DATA_PAGE
+ && pageHeader.getType() != PageType.DATA_PAGE_V2)
+ );
- if (pageData == null || pageHeader == null) {
+ if (pageData == null) {
throw new DrillRuntimeException(String.format(
- "Failed to read another page having read %d of %d values from its column chunk.",
+ "Failed to read another page having read %d of %d values from its " +
+ "column chunk.",
parentColumnReader.totalValuesRead,
totalValueCount
));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
index 683d40f..780aa6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
@@ -414,4 +414,34 @@ public class TestEmptyParquet extends ClusterTest {
String plan = queryBuilder().sql(sql).explainText();
assertTrue(plan.contains("usedMetadataFile=true"));
}
+
+ /**
+ * Test a Parquet file containing a zero-byte dictionary page, c.f.
+ * DRILL-8023.
+ */
+ @Test
+ public void testEmptyDictPage() throws Exception {
+ try {
+ client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
+ // Only column 'C' in empty_dict_page.parquet has an empty dictionary page.
+ String sql = "select A,B,C from dfs.`parquet/empty/empty_dict_page.parquet`";
+ RowSet actual = queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("A", TypeProtos.MinorType.BIGINT)
+ .addNullable("B", TypeProtos.MinorType.VARCHAR)
+ .addNullable("C", TypeProtos.MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(1, "a", null)
+ .addRow(2, "b", null)
+ .addRow(3, "c", null)
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ client.resetSession(ExecConstants.PARQUET_NEW_RECORD_READER);
+ }
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet b/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet
new file mode 100644
index 0000000..130e88d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/empty/empty_dict_page.parquet differ