You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/09/28 02:38:45 UTC

[impala] 02/02: IMPALA-10894: Pushing down predicates in reading "original files" of ACID tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d7068ace15b5c7affe0812155f037789905ef74d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Sep 27 10:12:29 2021 +0800

    IMPALA-10894: Pushing down predicates in reading "original files" of ACID tables
    
    ACID tables can have "original files" that don't have full ACID schema.
    For instance, if we upgrade a non-ACID table to full ACID, the original
    files won't be changed so they don't have ACID columns, i.e. operation,
    originalTransaction, bucket, rowid, and currentTransaction.
    
    Besides rowid, the other 4 columns can be calculated based on the file
    path. We calculate the rowid as row index inside the file. This is done
    by setting a first row id for the split then the OrcStructReader fills
    the rowid slot with values auto-incremented by one.
    
    However, if we push down predicates into the ORC reader, some rows may
    be skipped. The ORC lib guarantees that rows in a returned batch are
    consecutive. But consecutive batches may skip rows in the middle. So we
    can't simply auto-increment the first row id by 1 to calculate the row
    index. Instead, we should use orc::RowReader::getRowNumber() to update
    the first row index of the returned batch.
    
    This patch changes the row index initialization logic to use
    orc::RowReader::getRowNumber(), and removes the branch that skips
    pushing down predicates on such case.
    
    Tests:
     - Ran test_full_acid_original_files
    
    Change-Id: I5bfdb624fcaf62ffa22f53025761b9dee3fe58a2
    Reviewed-on: http://gerrit.cloudera.org:8080/17870
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc | 22 +++++-----------------
 1 file changed, 5 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index e6647cd..99a8fb9 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -708,9 +708,6 @@ Status HdfsOrcScanner::NextStripe() {
   advance_stripe_ = false;
   stripe_rows_read_ = 0;
 
-  bool first_invocation = stripe_idx_ == -1;
-  int64_t skipped_rows = 0;
-
   // Loop until we have found a non-empty stripe.
   while (true) {
     // Reset the parse status for the next stripe.
@@ -740,17 +737,11 @@ Status HdfsOrcScanner::NextStripe() {
         stripe_mid_pos < split_offset + split_length)) {
       // Middle pos not in split, this stripe will be handled by a different scanner.
       // Mark if the stripe overlaps with the split.
-      if (first_invocation) skipped_rows += stripe->getNumberOfRows();
       misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset,
           stripe_offset + stripe_len, split_offset, split_offset + split_length);
       continue;
     }
 
-    // Set the file row index in 'orc_root_reader_' accordingly.
-    if (first_invocation && acid_synthetic_rowid_ != nullptr) {
-      orc_root_reader_->SetFileRowIndex(skipped_rows);
-    }
-
     COUNTER_ADD(num_stripes_counter_, 1);
     row_reader_options_.range(stripe->getOffset(), stripe_len);
     try {
@@ -791,6 +782,11 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
       try {
         end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
         RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get()));
+        if (acid_synthetic_rowid_ != nullptr) {
+          // Set the first row index of the batch. The ORC reader guarantees that rows
+          // are consecutive in the returned batch.
+          orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber());
+        }
         if (end_of_stripe_) break; // no more data to process
       } catch (ResourceError& e) {
         parse_status_ = e.GetStatus();
@@ -1053,14 +1049,6 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
 
   const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
   if (!min_max_tuple_desc) return Status::OK();
-  // TODO(IMPALA-10894): pushing down predicates into the ORC reader will mess up the
-  //  synthetic(fake) row id, because the row index in the returned batch might differ
-  //  from the index in file (due to some rows are skipped).
-  if (acid_synthetic_rowid_ != nullptr) {
-    VLOG_FILE << "Skip pushing down predicates on non-ACID ORC files under an ACID "
-                 "table: " << filename();
-    return Status::OK();
-  }
 
   // Clone the min/max statistics conjuncts.
   RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,