You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/04/02 05:43:43 UTC

[impala] 02/02: IMPALA-11039: Fix incorrect page jumping in late materialization of Parquet

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0fb14962d7db7be8efcf0559b1781872b3e36e6e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Mar 31 17:52:46 2022 +0800

    IMPALA-11039: Fix incorrect page jumping in late materialization of Parquet
    
    The current calculation of LastRowIdxInCurrentPage() is incorrect. It
    uses the first row index of the next candidate page instead of the next
    valid page. The next candidate page could be far away from the current
    page. Thus giving a number larger than the current page size. Skipping
    rows in the current page could overflow the boundary due to this. This
    patch fixes LastRowIdxInCurrentPage() to use the next valid page.
    
    When skip_row_id is set (>0), the current approach of
    SkipRowsInternal<false>() expects jumping to a page containing this row
    and then skipping rows in that page. However, the expected row might
    not be in the candidate pages. When we jump to the next candidate page,
    the target row could already be skipped. In this case, we don't need to
    skip rows in the current page.
    
    Tests:
     - Add a test on alltypes_empty_pages to reveal the bug.
     - Add more batch_size values in test_page_index.
     - Pass tests/query_test/test_parquet_stats.py locally.
    
    Change-Id: I3a783115ba8faf1a276e51087f3a70f79402c21d
    Reviewed-on: http://gerrit.cloudera.org:8080/18372
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  6 ++-
 be/src/exec/parquet/parquet-column-readers.cc      | 21 ++++----
 be/src/exec/parquet/parquet-column-readers.h       | 24 +++++----
 be/src/exec/parquet/parquet-common.cc              |  7 ---
 be/src/exec/parquet/parquet-common.h               |  7 +++
 .../queries/QueryTest/parquet-page-index.test      | 60 +++++++++++++++++++++-
 tests/query_test/test_parquet_stats.py             |  2 +-
 7 files changed, 97 insertions(+), 30 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 71f2079..fcc4c85 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2374,7 +2374,11 @@ Status HdfsParquetScanner::SkipRowsForColumns(
       // Skipping may fail for corrupted Parquet file due to mismatch of rows
       // among columns.
       if (UNLIKELY(!col_reader->SkipRows(*num_rows_to_skip, *skip_to_row))) {
-        return Status(Substitute("Error in skipping rows in file $0.", filename()));
+        return Status(Substitute(
+            "Parquet file might be corrupted: Error in skipping $0 values to row $1 "
+            "in column $2 of file $3.",
+            *num_rows_to_skip, *skip_to_row, col_reader->schema_element().name,
+            filename()));
       }
     }
     *num_rows_to_skip = 0;
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 70e736e..ec84f08 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1523,6 +1523,8 @@ void BaseScalarColumnReader::SetLevelDecodeError(
 /// For page filtering, we keep track of first and last page indexes and keep
 /// traversing to next page until we find a page that contains 'skip_row_id'.
 /// At that point, we can just skip to the required row id.
+/// If the page of 'skip_row_id' is not a candidate page, we will stop at the
+/// next candidate page and 'skip_row_id' is skipped by the way.
 /// Difference between scenario 2 and 3 is that in scenario 2, we end up
 /// decompressing all the pages being skipped, whereas in scenario 3 we only
 /// decompress pages required and avoid decompression needed. This is possible
@@ -1538,22 +1540,23 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row
         return false;
       }
     }
-    int last_row_idx = LastRowIdxInCurrentPage();
-    // Keep advancing until we hit reach required page containing 'skip_row_id'
-    while (skip_row_id > last_row_idx) {
+    // Keep advancing until we hit reach required page containing 'skip_row_id' or
+    // jump over it if that page is not a candidate page.
+    while (skip_row_id > LastRowIdxInCurrentPage()) {
       COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
       if (UNLIKELY(!JumpToNextPage())) {
         return false;
       }
-      last_row_idx = LastRowIdxInCurrentPage();
     }
-    DCHECK_GE(skip_row_id, FirstRowIdxInCurrentPage());
     int64_t last_row = LastProcessedRow();
     int64_t remaining = 0;
-    // Skip to the required row id within the page.
-    if (last_row < skip_row_id) {
-      if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) {
-        return false;
+    if (skip_row_id >= FirstRowIdxInCurrentPage()) {
+      // Skip to the required row id within the page. Only needs this when row id locates
+      // in current page.
+      if (last_row < skip_row_id) {
+        if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) {
+          return false;
+        }
       }
     }
     // also need to adjust 'candidate_row_ranges' as we skipped to new row id.
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 63ab694..51c0b50 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -380,7 +380,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Collection of page indexes that we are going to read. When we use page filtering,
   /// we issue a scan-range with sub-ranges that belong to the candidate data pages, i.e.
   /// we will not even see the bytes of the filtered out pages.
-  /// It is set in HdfsParquetScanner::CalculateCandidatePagesForColumns().
+  /// It is set in HdfsParquetScanner::ComputeCandidatePagesForColumns().
   std::vector<int> candidate_data_pages_;
 
   /// Stores an index to 'candidate_data_pages_'. It is the currently read data page when
@@ -544,19 +544,21 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   }
 
   // Returns the last row index of the current page. It is one less than first row index
-  // of next page. For last page, it is one less than 'num_rows' of row group.
+  // of the next valid page. For last page, it is one less than 'num_rows' of row group.
   int64_t LastRowIdxInCurrentPage() const {
     DCHECK(!candidate_data_pages_.empty());
-    DCHECK_LE(candidate_page_idx_, candidate_data_pages_.size() - 1) ;
-    if (candidate_page_idx_ == candidate_data_pages_.size() - 1) {
-      parquet::RowGroup& row_group =
-          parent_->file_metadata_.row_groups[parent_->row_group_idx_];
-      return row_group.num_rows - 1;
-    } else {
-      return offset_index_.page_locations[candidate_data_pages_[candidate_page_idx_ + 1]]
-                 .first_row_index
-          - 1;
+    int64_t num_rows =
+        parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows;
+    // Find the next valid page.
+    int page_idx = candidate_data_pages_[candidate_page_idx_] + 1;
+    while (page_idx < offset_index_.page_locations.size()) {
+      const auto& page_loc = offset_index_.page_locations[page_idx];
+      if (IsValidPageLocation(page_loc, num_rows)) {
+        return page_loc.first_row_index - 1;
+      }
+      ++page_idx;
     }
+    return num_rows - 1;
   }
 
   /// Wrapper around 'SkipTopLevelRows' to skip across multiple pages.
diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc
index abb817f..34133ea 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -133,13 +133,6 @@ bool ComputeCandidateRanges(const int64_t num_rows, vector<RowRange>* skip_range
   return true;
 }
 
-inline bool IsValidPageLocation(const parquet::PageLocation& page_loc,
-    const int64_t num_rows) {
-  return page_loc.offset >= 0 &&
-         page_loc.first_row_index >= 0 &&
-         page_loc.first_row_index < num_rows;
-}
-
 static bool ValidatePageLocations(const vector<parquet::PageLocation>& page_locations,
     const int64_t num_rows) {
   int last_valid_idx = -1;
diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h
index 1284722..0e54779 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -103,6 +103,13 @@ struct PageRange {
   }
 };
 
+inline bool IsValidPageLocation(const parquet::PageLocation& page_loc,
+    const int64_t num_rows) {
+  return page_loc.offset >= 0 &&
+         page_loc.first_row_index >= 0 &&
+         page_loc.first_row_index < num_rows;
+}
+
 /// Returns the row range for a given page range using information from the row group
 /// and offset index.
 void GetRowRangeForPageRange(const parquet::RowGroup& row_group,
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
index f37a064..9bf7b53 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
@@ -248,6 +248,64 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM
 aggregation(SUM, NumStatsFilteredPages): 203
 ====
 ---- QUERY
+select * from alltypes_empty_pages where int_col=0
+---- RESULTS
+0,true,0,0,0,0,0.0,0.0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+10,true,0,0,0,0,0.0,0.0,'01/02/09','0',2009-01-02 00:10:00.450000000,2009,1
+20,true,0,0,0,0,0.0,0.0,'01/03/09','0',2009-01-03 00:20:00.900000000,2009,1
+30,true,0,0,0,0,0.0,0.0,'01/04/09','0',2009-01-04 00:30:01.350000000,2009,1
+40,true,0,0,0,0,0.0,0.0,'01/05/09','0',2009-01-05 00:40:01.800000000,2009,1
+50,true,0,0,0,0,0.0,0.0,'01/06/09','0',2009-01-06 00:50:02.250000000,2009,1
+60,true,0,0,0,0,0.0,0.0,'01/07/09','0',2009-01-07 01:00:02.700000000,2009,1
+70,true,0,0,0,0,0.0,0.0,'01/08/09','0',2009-01-08 01:10:03.150000000,2009,1
+80,true,0,0,0,0,0.0,0.0,'01/09/09','0',2009-01-09 01:20:03.600000000,2009,1
+90,true,0,0,0,0,0.0,0.0,'01/10/09','0',2009-01-10 01:30:04.500000000,2009,1
+100,true,0,0,0,0,0.0,0.0,'01/11/09','0',2009-01-11 01:40:04.500000000,2009,1
+110,true,0,0,0,0,0.0,0.0,'01/12/09','0',2009-01-12 01:50:04.950000000,2009,1
+120,true,0,0,0,0,0.0,0.0,'01/13/09','0',2009-01-13 02:00:05.400000000,2009,1
+130,true,0,0,0,0,0.0,0.0,'01/14/09','0',2009-01-14 02:10:05.850000000,2009,1
+140,true,0,0,0,0,0.0,0.0,'01/15/09','0',2009-01-15 02:20:06.300000000,2009,1
+150,true,0,0,0,0,0.0,0.0,'01/16/09','0',2009-01-16 02:30:06.750000000,2009,1
+160,true,0,0,0,0,0.0,0.0,'01/17/09','0',2009-01-17 02:40:07.200000000,2009,1
+170,true,0,0,0,0,0.0,0.0,'01/18/09','0',2009-01-18 02:50:07.650000000,2009,1
+180,true,0,0,0,0,0.0,0.0,'01/19/09','0',2009-01-19 03:00:08.100000000,2009,1
+190,true,0,0,0,0,0.0,0.0,'01/20/09','0',2009-01-20 03:10:08.550000000,2009,1
+200,true,0,0,0,0,0.0,0.0,'01/21/09','0',2009-01-21 03:20:09,2009,1
+210,true,0,0,0,0,0.0,0.0,'01/22/09','0',2009-01-22 03:30:09.450000000,2009,1
+220,true,0,0,0,0,0.0,0.0,'01/23/09','0',2009-01-23 03:40:09.900000000,2009,1
+230,true,0,0,0,0,0.0,0.0,'01/24/09','0',2009-01-24 03:50:10.350000000,2009,1
+240,true,0,0,0,0,0.0,0.0,'01/25/09','0',2009-01-25 04:00:10.800000000,2009,1
+250,true,0,0,0,0,0.0,0.0,'01/26/09','0',2009-01-26 04:10:11.250000000,2009,1
+260,true,0,0,0,0,0.0,0.0,'01/27/09','0',2009-01-27 04:20:11.700000000,2009,1
+270,true,0,0,0,0,0.0,0.0,'01/28/09','0',2009-01-28 04:30:12.150000000,2009,1
+280,true,0,0,0,0,0.0,0.0,'01/29/09','0',2009-01-29 04:40:12.600000000,2009,1
+290,true,0,0,0,0,0.0,0.0,'01/30/09','0',2009-01-30 04:50:13.500000000,2009,1
+300,true,0,0,0,0,0.0,0.0,'01/31/09','0',2009-01-31 05:00:13.500000000,2009,1
+310,true,0,0,0,0,0.0,0.0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+320,true,0,0,0,0,0.0,0.0,'02/02/09','0',2009-02-02 00:10:00.450000000,2009,2
+330,true,0,0,0,0,0.0,0.0,'02/03/09','0',2009-02-03 00:20:00.900000000,2009,2
+340,true,0,0,0,0,0.0,0.0,'02/04/09','0',2009-02-04 00:30:01.350000000,2009,2
+350,true,0,0,0,0,0.0,0.0,'02/05/09','0',2009-02-05 00:40:01.800000000,2009,2
+360,true,0,0,0,0,0.0,0.0,'02/06/09','0',2009-02-06 00:50:02.250000000,2009,2
+370,true,0,0,0,0,0.0,0.0,'02/07/09','0',2009-02-07 01:00:02.700000000,2009,2
+380,true,0,0,0,0,0.0,0.0,'02/08/09','0',2009-02-08 01:10:03.150000000,2009,2
+390,true,0,0,0,0,0.0,0.0,'02/09/09','0',2009-02-09 01:20:03.600000000,2009,2
+400,true,0,0,0,0,0.0,0.0,'02/10/09','0',2009-02-10 01:30:04.500000000,2009,2
+410,true,0,0,0,0,0.0,0.0,'02/11/09','0',2009-02-11 01:40:04.500000000,2009,2
+420,true,0,0,0,0,0.0,0.0,'02/12/09','0',2009-02-12 01:50:04.950000000,2009,2
+430,true,0,0,0,0,0.0,0.0,'02/13/09','0',2009-02-13 02:00:05.400000000,2009,2
+440,true,0,0,0,0,0.0,0.0,'02/14/09','0',2009-02-14 02:10:05.850000000,2009,2
+450,true,0,0,0,0,0.0,0.0,'02/15/09','0',2009-02-15 02:20:06.300000000,2009,2
+460,true,0,0,0,0,0.0,0.0,'02/16/09','0',2009-02-16 02:30:06.750000000,2009,2
+470,true,0,0,0,0,0.0,0.0,'02/17/09','0',2009-02-17 02:40:07.200000000,2009,2
+480,true,0,0,0,0,0.0,0.0,'02/18/09','0',2009-02-18 02:50:07.650000000,2009,2
+490,true,0,0,0,0,0.0,0.0,'02/19/09','0',2009-02-19 03:00:08.100000000,2009,2
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 17
+====
+---- QUERY
 # Query table with invalid offset index.
 set abort_on_error=1;
 select * from alltypes_invalid_pages where id = 109
@@ -336,4 +394,4 @@ BIGINT
 drop table lineitem_comment;
 ---- RESULTS
 'Table has been dropped.'
-====
\ No newline at end of file
+====
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 28a2288..c9663af 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -101,7 +101,7 @@ class TestParquetStats(ImpalaTestSuite):
       self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain',
                          new_vector, unique_database)
 
-    for batch_size in [0, 32]:
+    for batch_size in [0, 1, 2, 3, 4, 8, 16, 32, 64, 128, 256, 512]:
       new_vector.get_value('exec_option')['batch_size'] = batch_size
       self.run_test_case('QueryTest/parquet-page-index-large', new_vector,
                          unique_database)