You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/08 23:37:19 UTC
[impala] 22/27: IMPALA-10267: Properly handle continued scanning after parse error in HdfsAvroScanner
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 06bdfa3ba7f17eddbad1aab3552f1111c6290556
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Jun 9 17:18:32 2022 +0200
IMPALA-10267: Properly handle continued scanning after parse error in HdfsAvroScanner
BaseSequenceScanner assumes that overloads of ProcessRange can continue
working after a parse error (some bytes can be skipped in this case
until the next sync marker) but HdfsAvroScanner::ProcessRange() did not
handle this scenario.
Testing:
- ran core tests
- didn't create new tests - the test was caught by a fuzz test,
it would be great to check whether we actually handle this
scenario, but I am not that familiar with Avro
Change-Id: I361c7ea781cd84033b57ea9ca437378636fda91f
Reviewed-on: http://gerrit.cloudera.org:8080/18610
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
be/src/exec/hdfs-avro-scanner.cc | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index efd638f99..a8fa737ef 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -502,8 +502,15 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
DCHECK_GT(row_batch->capacity(), row_batch->num_rows());
while (!eos_ && !scan_node_->ReachedLimitShared()) {
if (record_pos_ == num_records_in_block_) {
- // Read new data block
- RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_));
+ // Read new data block. Reset members first to avoid corrupt state after
+ // recovery from parse error.
+ record_pos_ = 0;
+ num_records_in_block_ = 0;
+ data_block_len_ = 0;
+ data_block_ = nullptr;
+ data_block_end_ = nullptr;
+ int64_t num_records_in_block;
+ RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block, &parse_status_));
if (num_records_in_block_ < 0) {
return Status(TErrorCode::AVRO_INVALID_RECORD_COUNT, stream_->filename(),
num_records_in_block_, stream_->file_offset());
@@ -531,8 +538,8 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
data_block_ = compressed_data;
data_block_len_ = compressed_size;
}
+ num_records_in_block_ = num_records_in_block;
data_block_end_ = data_block_ + data_block_len_;
- record_pos_ = 0;
}
int64_t prev_record_pos = record_pos_;