You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2022/12/09 17:57:09 UTC

[impala] 02/03: IMPALA-11780: Wrong FILE__POSITION values for multi row group Parquet files when page filtering is used

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c56cd7b214dae8e4d31e093c71421f524452f53c
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Dec 7 13:40:13 2022 +0100

    IMPALA-11780: Wrong FILE__POSITION values for multi row group Parquet files when page filtering is used
    
    Impala generated wrong values for the FILE__POSITION column when the
    Parquet file contained multiple row groups and page filtering was
    used as well.
    
    We are using the value of 'current_row_' in the Parquet column readers
    to populate the file position slot. The problem is that 'current_row_'
    denotes the index of the row within the row group and not within the
    file. We cannot change 'current_row_' as page filtering depends on its
    value, as the page index also uses the row group-based indexes of the
    rows, not the file indexes.
    
    In the meantime it turned out FILE__POSITION was also not set correctly
    in the Parquet late materialization code, as
    BaseScalarColumnReader::SkipRowsInternal() didn't update 'current_row_'
    in some code paths.
    
    The value of FILE__POSITION is critical for Iceberg V2 tables as
    position delete files store file positions of the deleted rows.
    
    Testing:
     * added e2e tests
     * the tests are now running w/o PARQUET_READ_STATISTICS to exercise
       more code paths
    
    Change-Id: I5ef37a1aa731eb54930d6689621cd6169fed6605
    Reviewed-on: http://gerrit.cloudera.org:8080/19328
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/parquet-column-readers.cc      |   8 +--
 be/src/exec/parquet/parquet-column-readers.h       |  14 +++++-
 testdata/data/README                               |  13 ++++-
 .../customer_nested_multiblock_multipage.parquet   | Bin 0 -> 807255 bytes
 .../virtual-column-file-position-parquet.test      |  56 +++++++++++++++++++++
 tests/query_test/test_scanners.py                  |  10 +++-
 6 files changed, 93 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 20cfe38b7..15bd9bfbe 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -903,7 +903,7 @@ bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValues(
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::
     ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos) {
-  *file_pos = current_row_;
+  *file_pos = FilePositionOfCurrentRow();
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
@@ -1068,7 +1068,8 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
   // See ColumnReader constructor.
   rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL;
   pos_current_value_ = ParquetLevel::INVALID_POS;
-  current_row_ = row_group_first_row - 1;
+  row_group_first_row_ = row_group_first_row;
+  current_row_ = -1;
 
   vector<ScanRange::SubRange> sub_ranges;
   CreateSubRanges(&sub_ranges);
@@ -1466,7 +1467,7 @@ int BaseScalarColumnReader::FillPositionsInCandidateRange(int rows_remaining,
     }
     ++val_count;
     if (file_pos_writer.IsValid()) {
-      *file_pos_writer.Advance() = current_row_;
+      *file_pos_writer.Advance() = FilePositionOfCurrentRow();
     } else if (pos_writer.IsValid()) {
       if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
       *pos_writer.Advance() = pos_current_value_++;
@@ -1672,6 +1673,7 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row
       while (num_rows > current_page_values) {
         COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
         num_rows -= current_page_values;
+        current_row_ += current_page_values;
         if (!col_chunk_reader_.SkipPageData().ok() || !AdvanceNextPageHeader()) {
           return false;
         }
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index c5aa058bd..d461c94e7 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -225,6 +225,9 @@ class ParquetColumnReader {
   const SlotDescriptor* pos_slot_desc_ = nullptr;
   const SlotDescriptor* file_pos_slot_desc_ = nullptr;
 
+  /// Index within the file of the first row in the row group.
+  int64_t row_group_first_row_ = 0;
+
   /// The next value to write into the position slot, if there is one. 64-bit int because
   /// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when this column
   /// reader does not have a current rep and def level (i.e. before the first NextLevels()
@@ -376,6 +379,12 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   template <bool ADVANCE_REP_LEVEL>
   bool NextLevels();
 
+  /// Returns file position of current row ('current_row_' is the index of the row
+  /// within the row group).
+  int64_t FilePositionOfCurrentRow() const {
+    return row_group_first_row_ + current_row_;
+  }
+
  protected:
   // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
   friend class HdfsParquetScanner;
@@ -414,7 +423,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Metadata for the column for the current row group.
   const parquet::ColumnMetaData* metadata_ = nullptr;
 
-  /// Index of the current top-level row. It is updated together with the rep/def levels.
+  /// Index of the current top-level row within the row group. It is updated together
+  /// with the rep/def levels.
   /// When updated, and its value is N, it means that we already processed the Nth row
   /// completely, hence the initial value is '-1', because '0' would mean that we already
   /// processed the first (zeroeth) row.
@@ -685,7 +695,7 @@ inline void ParquetColumnReader::ReadFilePositionNonBatched(int64_t* file_pos) {
   DCHECK_GE(def_level_, 0);
   DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
       "Caller should have called NextLevels() until we are ready to read a value";
-  *file_pos = LastProcessedRow() + 1;
+  *file_pos = row_group_first_row_ + LastProcessedRow() + 1;
 }
 
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
diff --git a/testdata/data/README b/testdata/data/README
index 6ac0a363e..9772cc2de 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -633,6 +633,17 @@ In order to generate this file, execute the following instruments:
    WHERE c_current_cdemo_sk IS NOT NULL ORDER BY c_current_cdemo_sk LIMIT 2000;
 generated file will contains multi blocks, multi pages per block.
 
+customer_nested_multiblock_multipage.parquet
+Parquet file that contains multiple row groups multiple pages and store nested
+data.
+Used Hive (version 3.1.3000.7.2.16.0-233) to generate Parquet file:
+1. SET parquet.block.size=8192;
+2. SET parquet.page.row.count.limit=20;
+3. CREATE TABLE customer_nested_multiblock_multipage
+   LIKE tpch_nested_parquet.customer STORED AS PARQUET;
+4. INSERT INTO customer_nested_multiblock_multipage
+   SELECT * FROM tpch_nested_parquet.customer ORDER BY c_custkey LIMIT 300;
+
 IMPALA-10361: Use field id to resolve columns for Iceberg tables
 We generated data by spark-shell, version is 2.4.x, and table data is in
 testdata/data/iceberg_test/hadoop_catalog/iceberg_resolution_test, this table
@@ -912,4 +923,4 @@ Converted similarly to iceberg_v2_no_deletes
 create_table_like_parquet_test.parquet:
 Generated by Hive
 create table iceberg_create_table_like_parquet_test (col_int int, col_float float, col_double double, col_string string, col_struct struct<col_int:int, col_float:float>, col_array array<string>, col_map map<string,array<int>>) stored as parquet;
-insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22)));
\ No newline at end of file
+insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22)));
diff --git a/testdata/data/customer_nested_multiblock_multipage.parquet b/testdata/data/customer_nested_multiblock_multipage.parquet
new file mode 100644
index 000000000..b4484c276
Binary files /dev/null and b/testdata/data/customer_nested_multiblock_multipage.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test
index 7ff2f9d19..bf54e6e5c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test
@@ -221,3 +221,59 @@ where id = l_orderkey and
 ---- TYPES
 BIGINT, BIGINT, INT, BIGINT
 ====
+---- QUERY
+select file__position, * from customer_multiblock_page_index where c_customer_sk = 99331;
+---- RESULTS
+1119,99331,'AAAAAAAADAEIBAAA',22643,7071,15946,2452620,2452590,'Mr.','Joseph','Mueller','N',2,10,1970,'VANUATU','NULL','Joseph.Mueller@yF61aCQLo7P.com','2452638'
+---- TYPES
+BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select file__position, * from customer_multiblock_page_index where c_customer_sk > 10000 and c_customer_sk < 10500;
+---- RESULTS
+303,10380,'AAAAAAAAMIICAAAA',6555,6205,18255,2452491,2452461,'Sir','William','Hunter','N',4,7,1964,'NAURU','NULL','William.Hunter@Kljv1U5NvPb.org','2452386'
+1219,10396,'AAAAAAAAMJICAAAA',24791,6556,15968,2450865,2450835,'Mr.','Gerald','Manley','N',14,10,1939,'PORTUGAL','NULL','Gerald.Manley@JVFnqX5nFxPqVU.org','2452430'
+1292,10208,'AAAAAAAAAOHCAAAA',26660,2359,9211,2452596,2452566,'Dr.','Lowell','Amos','N',26,10,1977,'SENEGAL','NULL','Lowell.Amos@cCFzlM.org','2452391'
+1300,10343,'AAAAAAAAHGICAAAA',26829,222,42462,2451479,2451449,'Sir','Bruce','Grice','N',23,1,1949,'CAYMAN ISLANDS','NULL','Bruce.Grice@s6LUofKyc.edu','2452313'
+1738,10153,'AAAAAAAAJKHCAAAA',35544,1696,35848,2449126,2449096,'Mr.','Wesley','Hooker','Y',23,2,1986,'NEW ZEALAND','NULL','Wesley.Hooker@mFyd27PhXy.org','2452527'
+1842,10353,'AAAAAAAABHICAAAA',37434,1053,15825,2451525,2451495,'Mrs.','Lucinda','Pierson','N',13,3,1980,'ARUBA','NULL','Lucinda.Pierson@27yDPr8kIQ5I2qN.edu','2452450'
+1856,10316,'AAAAAAAAMEICAAAA',37920,2586,49299,2452571,2452541,'Ms.','Carolyn','Guajardo','Y',11,7,1931,'MYANMAR','NULL','Carolyn.Guajardo@7gSYU.com','2452600'
+---- TYPES
+BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select file__position, * from customer_multiblock_page_index where c_first_sales_date_sk = 2449000;
+---- RESULTS
+1452,99451,'AAAAAAAALHEIBAAA',29723,723,35323,2449030,2449000,'Mrs.','Judy','Grant','N',28,1,1955,'INDIA','NULL','Judy.Grant@YdXFRcSxUuNf1N5Ej.edu','2452566'
+---- TYPES
+BIGINT, INT, STRING, INT, INT, INT, INT, INT, STRING, STRING, STRING, STRING, INT, INT, INT, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select * from customer_nested_multiblock_multipage where c_custkey = 300;
+---- RESULTS
+300,'Customer#000000300','I0fJfo60DRqQ',7,'17-165-193-5964',8084.92,'AUTOMOBILE','p fluffily among the slyly express grouches. furiously express instruct'
+---- TYPES
+BIGINT, STRING, STRING, SMALLINT, STRING, DECIMAL, STRING, STRING
+====
+---- QUERY
+select * from customer_nested_multiblock_multipage where c_name = 'Customer#000000295';
+---- RESULTS
+295,'Customer#000000295','mk649IH6njR14woTVZ1cxtlNs URxBHD5o5z2',0,'10-340-773-4322',9497.89,'HOUSEHOLD','play according to the quickly ironic instructions-- unusual, bol'
+---- TYPES
+BIGINT, STRING, STRING, SMALLINT, STRING, DECIMAL, STRING, STRING
+====
+---- QUERY
+select file__position, l_shipdate from customer_nested_multiblock_multipage c, c.c_orders.o_lineitems where l_shipdate='1998-11-26';
+---- RESULTS
+80,'1998-11-26'
+---- TYPES
+BIGINT, STRING
+====
+---- QUERY
+select file__position, l_shipdate from customer_nested_multiblock_multipage c, c.c_orders.o_lineitems where l_partkey = 199994;
+---- RESULTS
+51,'1993-05-13'
+82,'1994-01-17'
+---- TYPES
+BIGINT, STRING
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 76ed7a254..b4ab8b15b 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -500,12 +500,18 @@ class TestParquet(ImpalaTestSuite):
   def test_virtual_column_file_position_parquet(self, vector, unique_database):
     # Parquet-specific tests for virtual column FILE__POSITION
     create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages')
+    create_table_from_parquet(self.client, unique_database,
+        'customer_multiblock_page_index')
+    create_table_from_parquet(self.client, unique_database,
+        'customer_nested_multiblock_multipage')
     new_vector = deepcopy(vector)
     for late_mat in [-1, 1, 17]:
       new_vector.get_value('exec_option')['parquet_late_materialization_threshold'] = \
           late_mat
-      self.run_test_case('QueryTest/virtual-column-file-position-parquet', new_vector,
-          unique_database)
+      for read_stats in ['true', 'false']:
+        new_vector.get_value('exec_option')['parquet_read_statistics'] = read_stats
+        self.run_test_case('QueryTest/virtual-column-file-position-parquet', new_vector,
+            unique_database)
 
   def test_corrupt_files(self, vector):
     new_vector = deepcopy(vector)