You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/03/18 14:21:36 UTC
[impala] branch master updated: IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 21ce4fb IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch
21ce4fb is described below
commit 21ce4fb130b9f47c55ca4769f4c41ab8eb258b56
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Mar 15 11:23:03 2022 +0800
IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch
The ORC lib uses exceptions to report failures. We are missing
exception handling in invoking orc::RowReader::createRowBatch which
requires memory allocation and could raise exceptions when it fails.
This patch simply adds a catch clause for it.
To simplify the codes, a macro is added for catching the ORC exceptions
with given message format.
Tests:
- run test_scanner_fuzz.py 20 times
Change-Id: I76e36a238220e7bed1cbbdcb3fc7d35394bfa023
Reviewed-on: http://gerrit.cloudera.org:8080/18321
Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/hdfs-orc-scanner.cc | 79 +++++++++++++++--------------------------
1 file changed, 28 insertions(+), 51 deletions(-)
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index a841537..5809ac7 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -42,6 +42,20 @@ using namespace impala::io;
namespace impala {
+/// Generic wrapper to catch exceptions thrown from the ORC lib.
+/// ResourceError is thrown by the OrcMemPool of our orc-scanner.
+/// Other exceptions, e.g. orc::ParseError, are thrown by the ORC lib.
+#define RETURN_ON_ORC_EXCEPTION(msg_format) \
+ catch (ResourceError& e) { \
+ parse_status_ = e.GetStatus(); \
+ return parse_status_; \
+ } catch (std::exception& e) { \
+ string msg = Substitute(msg_format, filename(), e.what()); \
+ parse_status_ = Status(msg); \
+ VLOG_QUERY << parse_status_.msg().msg(); \
+ return parse_status_; \
+ }
+
Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
DCHECK(!files.empty());
@@ -203,15 +217,7 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe)
columnRanges_.emplace_back(stream->getLength(), stream->getOffset(),
stream->getKind(), stream->getColumnId(), this);
}
- } catch (ResourceError& e) { // errors throw from the orc scanner
- parse_status_ = e.GetStatus();
- return parse_status_;
- } catch (std::exception& e) { // other errors throw from the orc library
- string msg = Substitute(
- "Encountered parse error in tail of ORC file $0: $1", filename(), e.what());
- parse_status_ = Status(msg);
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
// Sort and check that there is no overlapping range in columnRanges_.
sort(columnRanges_.begin(), columnRanges_.end());
@@ -431,12 +437,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
}
orc_root_reader_ = this->obj_pool_.Add(
new OrcStructReader(root_type, scan_node_->tuple_desc(), this));
- } catch (std::exception& e) {
- string msg = Substitute("Encountered parse error during schema selection in "
- "ORC file $0: $1", filename(), e.what());
- parse_status_ = Status(msg);
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION(
+ "Encountered parse error during schema selection in ORC file $0: $1");
+
// Set top-level template tuple.
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
return Status::OK();
@@ -499,15 +502,7 @@ Status HdfsOrcScanner::ProcessFileTail() {
VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
<< ", file_length: " << input_stream->getLength();
reader_ = orc::createReader(move(input_stream), reader_options_);
- } catch (ResourceError& e) { // errors throw from the orc scanner
- parse_status_ = e.GetStatus();
- return parse_status_;
- } catch (std::exception& e) { // other errors throw from the orc library
- string msg = Substitute("Encountered parse error in tail of ORC file $0: $1",
- filename(), e.what());
- parse_status_ = Status(msg);
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
if (reader_->getNumberOfRows() == 0) return Status::OK();
if (reader_->getNumberOfStripes() == 0) {
@@ -921,15 +916,7 @@ Status HdfsOrcScanner::NextStripe() {
RETURN_IF_ERROR(PrepareSearchArguments());
try {
row_reader_ = reader_->createRowReader(row_reader_options_);
- } catch (ResourceError& e) { // errors throw from the orc scanner
- parse_status_ = e.GetStatus();
- return parse_status_;
- } catch (std::exception& e) { // errors throw from the orc library
- parse_status_ = Status(Substitute(
- "Error in creating column readers for ORC file $0: $1.", filename(), e.what()));
- VLOG_QUERY << parse_status_.msg().msg();
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION("Error in creating column readers for ORC file $0: $1.");
end_of_stripe_ = false;
VLOG_ROW << Substitute("Created RowReader for stripe(offset=$0, len=$1) in file $2",
stripe->getOffset(), stripe_len, filename());
@@ -947,8 +934,10 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
// We're going to free the previous batch. Clear the reference first.
RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(nullptr));
- orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
- DCHECK_EQ(orc_root_batch_->numElements, 0);
+ try {
+ orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
+ DCHECK_EQ(orc_root_batch_->numElements, 0);
+ } RETURN_ON_ORC_EXCEPTION("Encounter error in creating ORC row batch for file $0: $1.");
int64_t num_rows_read = 0;
while (continue_execution) { // one ORC batch (ColumnVectorBatch) in a round
@@ -963,16 +952,8 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber());
}
if (end_of_stripe_) break; // no more data to process
- } catch (ResourceError& e) {
- parse_status_ = e.GetStatus();
- return parse_status_;
- } catch (std::exception& e) {
- parse_status_ = Status(Substitute(
- "Encounter parse error in ORC file $0: $1.", filename(), e.what()));
- VLOG_QUERY << parse_status_.msg().msg();
- eos_ = true;
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION("Encounter parse error in ORC file $0: $1.");
+
if (orc_root_batch_->numElements == 0) {
RETURN_IF_ERROR(CommitRows(0, row_batch));
end_of_stripe_ = true;
@@ -1390,12 +1371,8 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
VLOG_FILE << "Built search arguments for ORC file: " << filename() << ": "
<< final_sarg->toString() << ". File schema: " << reader_->getType().toString();
row_reader_options_.searchArgument(std::move(final_sarg));
- } catch (std::exception& e) {
- string msg = Substitute("Encountered parse error during building search arguments "
- "in ORC file $0: $1", filename(), e.what());
- parse_status_ = Status(msg);
- return parse_status_;
- }
+ } RETURN_ON_ORC_EXCEPTION(
+ "Encountered parse error during building search arguments in ORC file $0: $1");
}
// Free any expr result allocations accumulated during conjunct evaluation.
context_->expr_results_pool()->Clear();