You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/04/02 05:43:41 UTC

[impala] branch master updated (8e755e7 -> 0fb1496)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 8e755e7  IMPALA-11126: impala-shell: Support configurable socket timeout for http client
     new 84d67f0  IMPALA-111218: TestIcebergTable.test_table_load_time_for_many_files flaky
     new 0fb1496  IMPALA-11039: Fix incorrect page jumping in late materialization of Parquet

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  6 ++-
 be/src/exec/parquet/parquet-column-readers.cc      | 21 ++++----
 be/src/exec/parquet/parquet-column-readers.h       | 24 +++++----
 be/src/exec/parquet/parquet-common.cc              |  7 ---
 be/src/exec/parquet/parquet-common.h               |  7 +++
 .../queries/QueryTest/parquet-page-index.test      | 60 +++++++++++++++++++++-
 tests/query_test/test_iceberg.py                   |  4 +-
 tests/query_test/test_parquet_stats.py             |  2 +-
 8 files changed, 99 insertions(+), 32 deletions(-)

[impala] 01/02: IMPALA-111218: TestIcebergTable.test_table_load_time_for_many_files flaky

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 84d67f09b4864f292ca56dc05464e7a6a16bc0f4
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Apr 1 21:04:27 2022 +0200

    IMPALA-111218: TestIcebergTable.test_table_load_time_for_many_files flaky
    
    In some test runs test_table_load_time_for_many_files failed because
    the time limit (3 seconds) was too short. In the failed tests it was
    around 4 seconds to execute the DESCRIBE statement.
    
    This patch increases the time limit to 10 seconds when HDFS is used.
    This should be long enough to fix the flakiness, but short enough
    to catch pathologically long table loads (the one that was fixed by
    IMPALA-11214).
    
    Change-Id: Ic48bb561e97c008c6f4e51fc9d9ec8b3f7cdaf9e
    Reviewed-on: http://gerrit.cloudera.org:8080/18376
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/query_test/test_iceberg.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 23b934f..322b7c9 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -563,9 +563,9 @@ class TestIcebergTable(ImpalaTestSuite):
     self.execute_query("describe formatted {}".format(tbl_name))
     elapsed_time = time.time() - start_time
     if IS_HDFS:
-      time_limit = 3
-    else:
       time_limit = 10
+    else:
+      time_limit = 20
     assert elapsed_time < time_limit
 
   def test_consistent_scheduling(self, vector, unique_database):

[impala] 02/02: IMPALA-11039: Fix incorrect page jumping in late materialization of Parquet

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0fb14962d7db7be8efcf0559b1781872b3e36e6e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Mar 31 17:52:46 2022 +0800

    IMPALA-11039: Fix incorrect page jumping in late materialization of Parquet
    
    The current calculation of LastRowIdxInCurrentPage() is incorrect. It
    uses the first row index of the next candidate page instead of the next
    valid page. The next candidate page could be far away from the current
    page. Thus giving a number larger than the current page size. Skipping
    rows in the current page could overflow the boundary due to this. This
    patch fixes LastRowIdxInCurrentPage() to use the next valid page.
    
    When skip_row_id is set (>0), the current approach of
    SkipRowsInternal<false>() expects jumping to a page containing this row
    and then skipping rows in that page. However, the expected row might
    not be in the candidate pages. When we jump to the next candidate page,
    the target row could already be skipped. In this case, we don't need to
    skip rows in the current page.
    
    Tests:
     - Add a test on alltypes_empty_pages to reveal the bug.
     - Add more batch_size values in test_page_index.
     - Pass tests/query_test/test_parquet_stats.py locally.
    
    Change-Id: I3a783115ba8faf1a276e51087f3a70f79402c21d
    Reviewed-on: http://gerrit.cloudera.org:8080/18372
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  6 ++-
 be/src/exec/parquet/parquet-column-readers.cc      | 21 ++++----
 be/src/exec/parquet/parquet-column-readers.h       | 24 +++++----
 be/src/exec/parquet/parquet-common.cc              |  7 ---
 be/src/exec/parquet/parquet-common.h               |  7 +++
 .../queries/QueryTest/parquet-page-index.test      | 60 +++++++++++++++++++++-
 tests/query_test/test_parquet_stats.py             |  2 +-
 7 files changed, 97 insertions(+), 30 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 71f2079..fcc4c85 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2374,7 +2374,11 @@ Status HdfsParquetScanner::SkipRowsForColumns(
       // Skipping may fail for corrupted Parquet file due to mismatch of rows
       // among columns.
       if (UNLIKELY(!col_reader->SkipRows(*num_rows_to_skip, *skip_to_row))) {
-        return Status(Substitute("Error in skipping rows in file $0.", filename()));
+        return Status(Substitute(
+            "Parquet file might be corrupted: Error in skipping $0 values to row $1 "
+            "in column $2 of file $3.",
+            *num_rows_to_skip, *skip_to_row, col_reader->schema_element().name,
+            filename()));
       }
     }
     *num_rows_to_skip = 0;
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 70e736e..ec84f08 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1523,6 +1523,8 @@ void BaseScalarColumnReader::SetLevelDecodeError(
 /// For page filtering, we keep track of first and last page indexes and keep
 /// traversing to next page until we find a page that contains 'skip_row_id'.
 /// At that point, we can just skip to the required row id.
+/// If the page of 'skip_row_id' is not a candidate page, we will stop at the
+/// next candidate page and 'skip_row_id' is skipped by the way.
 /// Difference between scenario 2 and 3 is that in scenario 2, we end up
 /// decompressing all the pages being skipped, whereas in scenario 3 we only
 /// decompress pages required and avoid decompression needed. This is possible
@@ -1538,22 +1540,23 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row
         return false;
       }
     }
-    int last_row_idx = LastRowIdxInCurrentPage();
-    // Keep advancing until we hit reach required page containing 'skip_row_id'
-    while (skip_row_id > last_row_idx) {
+    // Keep advancing until we hit reach required page containing 'skip_row_id' or
+    // jump over it if that page is not a candidate page.
+    while (skip_row_id > LastRowIdxInCurrentPage()) {
       COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
       if (UNLIKELY(!JumpToNextPage())) {
         return false;
       }
-      last_row_idx = LastRowIdxInCurrentPage();
     }
-    DCHECK_GE(skip_row_id, FirstRowIdxInCurrentPage());
     int64_t last_row = LastProcessedRow();
     int64_t remaining = 0;
-    // Skip to the required row id within the page.
-    if (last_row < skip_row_id) {
-      if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) {
-        return false;
+    if (skip_row_id >= FirstRowIdxInCurrentPage()) {
+      // Skip to the required row id within the page. Only needs this when row id locates
+      // in current page.
+      if (last_row < skip_row_id) {
+        if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) {
+          return false;
+        }
       }
     }
     // also need to adjust 'candidate_row_ranges' as we skipped to new row id.
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 63ab694..51c0b50 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -380,7 +380,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Collection of page indexes that we are going to read. When we use page filtering,
   /// we issue a scan-range with sub-ranges that belong to the candidate data pages, i.e.
   /// we will not even see the bytes of the filtered out pages.
-  /// It is set in HdfsParquetScanner::CalculateCandidatePagesForColumns().
+  /// It is set in HdfsParquetScanner::ComputeCandidatePagesForColumns().
   std::vector<int> candidate_data_pages_;
 
   /// Stores an index to 'candidate_data_pages_'. It is the currently read data page when
@@ -544,19 +544,21 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   }
 
   // Returns the last row index of the current page. It is one less than first row index
-  // of next page. For last page, it is one less than 'num_rows' of row group.
+  // of the next valid page. For last page, it is one less than 'num_rows' of row group.
   int64_t LastRowIdxInCurrentPage() const {
     DCHECK(!candidate_data_pages_.empty());
-    DCHECK_LE(candidate_page_idx_, candidate_data_pages_.size() - 1) ;
-    if (candidate_page_idx_ == candidate_data_pages_.size() - 1) {
-      parquet::RowGroup& row_group =
-          parent_->file_metadata_.row_groups[parent_->row_group_idx_];
-      return row_group.num_rows - 1;
-    } else {
-      return offset_index_.page_locations[candidate_data_pages_[candidate_page_idx_ + 1]]
-                 .first_row_index
-          - 1;
+    int64_t num_rows =
+        parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows;
+    // Find the next valid page.
+    int page_idx = candidate_data_pages_[candidate_page_idx_] + 1;
+    while (page_idx < offset_index_.page_locations.size()) {
+      const auto& page_loc = offset_index_.page_locations[page_idx];
+      if (IsValidPageLocation(page_loc, num_rows)) {
+        return page_loc.first_row_index - 1;
+      }
+      ++page_idx;
     }
+    return num_rows - 1;
   }
 
   /// Wrapper around 'SkipTopLevelRows' to skip across multiple pages.
diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc
index abb817f..34133ea 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -133,13 +133,6 @@ bool ComputeCandidateRanges(const int64_t num_rows, vector<RowRange>* skip_range
   return true;
 }
 
-inline bool IsValidPageLocation(const parquet::PageLocation& page_loc,
-    const int64_t num_rows) {
-  return page_loc.offset >= 0 &&
-         page_loc.first_row_index >= 0 &&
-         page_loc.first_row_index < num_rows;
-}
-
 static bool ValidatePageLocations(const vector<parquet::PageLocation>& page_locations,
     const int64_t num_rows) {
   int last_valid_idx = -1;
diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h
index 1284722..0e54779 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -103,6 +103,13 @@ struct PageRange {
   }
 };
 
+inline bool IsValidPageLocation(const parquet::PageLocation& page_loc,
+    const int64_t num_rows) {
+  return page_loc.offset >= 0 &&
+         page_loc.first_row_index >= 0 &&
+         page_loc.first_row_index < num_rows;
+}
+
 /// Returns the row range for a given page range using information from the row group
 /// and offset index.
 void GetRowRangeForPageRange(const parquet::RowGroup& row_group,
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
index f37a064..9bf7b53 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
@@ -248,6 +248,64 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM
 aggregation(SUM, NumStatsFilteredPages): 203
 ====
 ---- QUERY
+select * from alltypes_empty_pages where int_col=0
+---- RESULTS
+0,true,0,0,0,0,0.0,0.0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+10,true,0,0,0,0,0.0,0.0,'01/02/09','0',2009-01-02 00:10:00.450000000,2009,1
+20,true,0,0,0,0,0.0,0.0,'01/03/09','0',2009-01-03 00:20:00.900000000,2009,1
+30,true,0,0,0,0,0.0,0.0,'01/04/09','0',2009-01-04 00:30:01.350000000,2009,1
+40,true,0,0,0,0,0.0,0.0,'01/05/09','0',2009-01-05 00:40:01.800000000,2009,1
+50,true,0,0,0,0,0.0,0.0,'01/06/09','0',2009-01-06 00:50:02.250000000,2009,1
+60,true,0,0,0,0,0.0,0.0,'01/07/09','0',2009-01-07 01:00:02.700000000,2009,1
+70,true,0,0,0,0,0.0,0.0,'01/08/09','0',2009-01-08 01:10:03.150000000,2009,1
+80,true,0,0,0,0,0.0,0.0,'01/09/09','0',2009-01-09 01:20:03.600000000,2009,1
+90,true,0,0,0,0,0.0,0.0,'01/10/09','0',2009-01-10 01:30:04.500000000,2009,1
+100,true,0,0,0,0,0.0,0.0,'01/11/09','0',2009-01-11 01:40:04.500000000,2009,1
+110,true,0,0,0,0,0.0,0.0,'01/12/09','0',2009-01-12 01:50:04.950000000,2009,1
+120,true,0,0,0,0,0.0,0.0,'01/13/09','0',2009-01-13 02:00:05.400000000,2009,1
+130,true,0,0,0,0,0.0,0.0,'01/14/09','0',2009-01-14 02:10:05.850000000,2009,1
+140,true,0,0,0,0,0.0,0.0,'01/15/09','0',2009-01-15 02:20:06.300000000,2009,1
+150,true,0,0,0,0,0.0,0.0,'01/16/09','0',2009-01-16 02:30:06.750000000,2009,1
+160,true,0,0,0,0,0.0,0.0,'01/17/09','0',2009-01-17 02:40:07.200000000,2009,1
+170,true,0,0,0,0,0.0,0.0,'01/18/09','0',2009-01-18 02:50:07.650000000,2009,1
+180,true,0,0,0,0,0.0,0.0,'01/19/09','0',2009-01-19 03:00:08.100000000,2009,1
+190,true,0,0,0,0,0.0,0.0,'01/20/09','0',2009-01-20 03:10:08.550000000,2009,1
+200,true,0,0,0,0,0.0,0.0,'01/21/09','0',2009-01-21 03:20:09,2009,1
+210,true,0,0,0,0,0.0,0.0,'01/22/09','0',2009-01-22 03:30:09.450000000,2009,1
+220,true,0,0,0,0,0.0,0.0,'01/23/09','0',2009-01-23 03:40:09.900000000,2009,1
+230,true,0,0,0,0,0.0,0.0,'01/24/09','0',2009-01-24 03:50:10.350000000,2009,1
+240,true,0,0,0,0,0.0,0.0,'01/25/09','0',2009-01-25 04:00:10.800000000,2009,1
+250,true,0,0,0,0,0.0,0.0,'01/26/09','0',2009-01-26 04:10:11.250000000,2009,1
+260,true,0,0,0,0,0.0,0.0,'01/27/09','0',2009-01-27 04:20:11.700000000,2009,1
+270,true,0,0,0,0,0.0,0.0,'01/28/09','0',2009-01-28 04:30:12.150000000,2009,1
+280,true,0,0,0,0,0.0,0.0,'01/29/09','0',2009-01-29 04:40:12.600000000,2009,1
+290,true,0,0,0,0,0.0,0.0,'01/30/09','0',2009-01-30 04:50:13.500000000,2009,1
+300,true,0,0,0,0,0.0,0.0,'01/31/09','0',2009-01-31 05:00:13.500000000,2009,1
+310,true,0,0,0,0,0.0,0.0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+320,true,0,0,0,0,0.0,0.0,'02/02/09','0',2009-02-02 00:10:00.450000000,2009,2
+330,true,0,0,0,0,0.0,0.0,'02/03/09','0',2009-02-03 00:20:00.900000000,2009,2
+340,true,0,0,0,0,0.0,0.0,'02/04/09','0',2009-02-04 00:30:01.350000000,2009,2
+350,true,0,0,0,0,0.0,0.0,'02/05/09','0',2009-02-05 00:40:01.800000000,2009,2
+360,true,0,0,0,0,0.0,0.0,'02/06/09','0',2009-02-06 00:50:02.250000000,2009,2
+370,true,0,0,0,0,0.0,0.0,'02/07/09','0',2009-02-07 01:00:02.700000000,2009,2
+380,true,0,0,0,0,0.0,0.0,'02/08/09','0',2009-02-08 01:10:03.150000000,2009,2
+390,true,0,0,0,0,0.0,0.0,'02/09/09','0',2009-02-09 01:20:03.600000000,2009,2
+400,true,0,0,0,0,0.0,0.0,'02/10/09','0',2009-02-10 01:30:04.500000000,2009,2
+410,true,0,0,0,0,0.0,0.0,'02/11/09','0',2009-02-11 01:40:04.500000000,2009,2
+420,true,0,0,0,0,0.0,0.0,'02/12/09','0',2009-02-12 01:50:04.950000000,2009,2
+430,true,0,0,0,0,0.0,0.0,'02/13/09','0',2009-02-13 02:00:05.400000000,2009,2
+440,true,0,0,0,0,0.0,0.0,'02/14/09','0',2009-02-14 02:10:05.850000000,2009,2
+450,true,0,0,0,0,0.0,0.0,'02/15/09','0',2009-02-15 02:20:06.300000000,2009,2
+460,true,0,0,0,0,0.0,0.0,'02/16/09','0',2009-02-16 02:30:06.750000000,2009,2
+470,true,0,0,0,0,0.0,0.0,'02/17/09','0',2009-02-17 02:40:07.200000000,2009,2
+480,true,0,0,0,0,0.0,0.0,'02/18/09','0',2009-02-18 02:50:07.650000000,2009,2
+490,true,0,0,0,0,0.0,0.0,'02/19/09','0',2009-02-19 03:00:08.100000000,2009,2
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 17
+====
+---- QUERY
 # Query table with invalid offset index.
 set abort_on_error=1;
 select * from alltypes_invalid_pages where id = 109
@@ -336,4 +394,4 @@ BIGINT
 drop table lineitem_comment;
 ---- RESULTS
 'Table has been dropped.'
-====
\ No newline at end of file
+====
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 28a2288..c9663af 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -101,7 +101,7 @@ class TestParquetStats(ImpalaTestSuite):
       self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain',
                          new_vector, unique_database)
 
-    for batch_size in [0, 32]:
+    for batch_size in [0, 1, 2, 3, 4, 8, 16, 32, 64, 128, 256, 512]:
       new_vector.get_value('exec_option')['batch_size'] = batch_size
       self.run_test_case('QueryTest/parquet-page-index-large', new_vector,
                          unique_database)