You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/08 23:37:19 UTC

[impala] 22/27: IMPALA-10267: Properly handle continued scanning after parse error in HdfsAvroScanner

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 06bdfa3ba7f17eddbad1aab3552f1111c6290556
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Jun 9 17:18:32 2022 +0200

    IMPALA-10267: Properly handle continued scanning after parse error in HdfsAvroScanner
    
    BaseSequenceScanner assumes that overloads of ProcessRange can continue
    working after a parse error (some bytes can be skipped in this case
    until the next sync marker) but HdfsAvroScanner::ProcessRange() did not
    handle this scenario.
    
    Testing:
    - ran core tests
    - didn't create new tests - the test was caught by a fuzz test,
      it would be great to check whether we actually handle this
      scenario, but I am not that familiar with Avro
    
    Change-Id: I361c7ea781cd84033b57ea9ca437378636fda91f
    Reviewed-on: http://gerrit.cloudera.org:8080/18610
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/hdfs-avro-scanner.cc | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index efd638f99..a8fa737ef 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -502,8 +502,15 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
   DCHECK_GT(row_batch->capacity(), row_batch->num_rows());
   while (!eos_ && !scan_node_->ReachedLimitShared()) {
     if (record_pos_ == num_records_in_block_) {
-      // Read new data block
-      RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_));
+      // Read new data block. Reset members first to avoid corrupt state after
+      // recovery from parse error.
+      record_pos_ = 0;
+      num_records_in_block_ = 0;
+      data_block_len_ = 0;
+      data_block_ = nullptr;
+      data_block_end_ = nullptr;
+      int64_t num_records_in_block;
+      RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block, &parse_status_));
       if (num_records_in_block_ < 0) {
         return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
             num_records_in_block_, stream_->file_offset());
@@ -531,8 +538,8 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
         data_block_ = compressed_data;
         data_block_len_ = compressed_size;
       }
+      num_records_in_block_ = num_records_in_block;
       data_block_end_ = data_block_ + data_block_len_;
-      record_pos_ = 0;
     }
 
     int64_t prev_record_pos = record_pos_;