You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/03/18 14:21:36 UTC

[impala] branch master updated: IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ce4fb  IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch
21ce4fb is described below

commit 21ce4fb130b9f47c55ca4769f4c41ab8eb258b56
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Mar 15 11:23:03 2022 +0800

    IMPALA-11182: catch exceptions of orc::RowReader::createRowBatch
    
    The ORC lib uses exceptions to report failures. We are missing
    exception handling in invoking orc::RowReader::createRowBatch which
    requires memory allocation and could raise exceptions when it fails.
    This patch simply adds a catch clause for it.
    
    To simplify the codes, a macro is added for catching the ORC exceptions
    with given message format.
    
    Tests:
     - run test_scanner_fuzz.py 20 times
    
    Change-Id: I76e36a238220e7bed1cbbdcb3fc7d35394bfa023
    Reviewed-on: http://gerrit.cloudera.org:8080/18321
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc | 79 +++++++++++++++--------------------------
 1 file changed, 28 insertions(+), 51 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index a841537..5809ac7 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -42,6 +42,20 @@ using namespace impala::io;
 
 namespace impala {
 
+/// Generic wrapper to catch exceptions thrown from the ORC lib.
+/// ResourceError is thrown by the OrcMemPool of our orc-scanner.
+/// Other exceptions, e.g. orc::ParseError, are thrown by the ORC lib.
+#define RETURN_ON_ORC_EXCEPTION(msg_format)                       \
+  catch (ResourceError& e) {                                      \
+    parse_status_ = e.GetStatus();                                \
+    return parse_status_;                                         \
+  } catch (std::exception& e) {                                   \
+    string msg = Substitute(msg_format, filename(), e.what());    \
+    parse_status_ = Status(msg);                                  \
+    VLOG_QUERY << parse_status_.msg().msg();                      \
+    return parse_status_;                                         \
+  }
+
 Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
@@ -203,15 +217,7 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe)
       columnRanges_.emplace_back(stream->getLength(), stream->getOffset(),
           stream->getKind(), stream->getColumnId(), this);
     }
-  } catch (ResourceError& e) { // errors throw from the orc scanner
-    parse_status_ = e.GetStatus();
-    return parse_status_;
-  } catch (std::exception& e) { // other errors throw from the orc library
-    string msg = Substitute(
-        "Encountered parse error in tail of ORC file $0: $1", filename(), e.what());
-    parse_status_ = Status(msg);
-    return parse_status_;
-  }
+  } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
 
   // Sort and check that there is no overlapping range in columnRanges_.
   sort(columnRanges_.begin(), columnRanges_.end());
@@ -431,12 +437,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
     }
     orc_root_reader_ = this->obj_pool_.Add(
         new OrcStructReader(root_type, scan_node_->tuple_desc(), this));
-  } catch (std::exception& e) {
-    string msg = Substitute("Encountered parse error during schema selection in "
-        "ORC file $0: $1", filename(), e.what());
-    parse_status_ = Status(msg);
-    return parse_status_;
-  }
+  } RETURN_ON_ORC_EXCEPTION(
+      "Encountered parse error during schema selection in ORC file $0: $1");
+
   // Set top-level template tuple.
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
   return Status::OK();
@@ -499,15 +502,7 @@ Status HdfsOrcScanner::ProcessFileTail() {
     VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
               << ", file_length: " << input_stream->getLength();
     reader_ = orc::createReader(move(input_stream), reader_options_);
-  } catch (ResourceError& e) {  // errors throw from the orc scanner
-    parse_status_ = e.GetStatus();
-    return parse_status_;
-  } catch (std::exception& e) { // other errors throw from the orc library
-    string msg = Substitute("Encountered parse error in tail of ORC file $0: $1",
-        filename(), e.what());
-    parse_status_ = Status(msg);
-    return parse_status_;
-  }
+  } RETURN_ON_ORC_EXCEPTION("Encountered parse error in tail of ORC file $0: $1");
 
   if (reader_->getNumberOfRows() == 0)  return Status::OK();
   if (reader_->getNumberOfStripes() == 0) {
@@ -921,15 +916,7 @@ Status HdfsOrcScanner::NextStripe() {
     RETURN_IF_ERROR(PrepareSearchArguments());
     try {
       row_reader_ = reader_->createRowReader(row_reader_options_);
-    } catch (ResourceError& e) {  // errors throw from the orc scanner
-      parse_status_ = e.GetStatus();
-      return parse_status_;
-    } catch (std::exception& e) { // errors throw from the orc library
-      parse_status_ = Status(Substitute(
-          "Error in creating column readers for ORC file $0: $1.", filename(), e.what()));
-      VLOG_QUERY << parse_status_.msg().msg();
-      return parse_status_;
-    }
+    } RETURN_ON_ORC_EXCEPTION("Error in creating column readers for ORC file $0: $1.");
     end_of_stripe_ = false;
     VLOG_ROW << Substitute("Created RowReader for stripe(offset=$0, len=$1) in file $2",
         stripe->getOffset(), stripe_len, filename());
@@ -947,8 +934,10 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   // We're going to free the previous batch. Clear the reference first.
   RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(nullptr));
 
-  orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
-  DCHECK_EQ(orc_root_batch_->numElements, 0);
+  try {
+    orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
+    DCHECK_EQ(orc_root_batch_->numElements, 0);
+  } RETURN_ON_ORC_EXCEPTION("Encounter error in creating ORC row batch for file $0: $1.");
 
   int64_t num_rows_read = 0;
   while (continue_execution) {  // one ORC batch (ColumnVectorBatch) in a round
@@ -963,16 +952,8 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
           orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber());
         }
         if (end_of_stripe_) break; // no more data to process
-      } catch (ResourceError& e) {
-        parse_status_ = e.GetStatus();
-        return parse_status_;
-      } catch (std::exception& e) {
-        parse_status_ = Status(Substitute(
-            "Encounter parse error in ORC file $0: $1.", filename(), e.what()));
-        VLOG_QUERY << parse_status_.msg().msg();
-        eos_ = true;
-        return parse_status_;
-      }
+      } RETURN_ON_ORC_EXCEPTION("Encounter parse error in ORC file $0: $1.");
+
       if (orc_root_batch_->numElements == 0) {
         RETURN_IF_ERROR(CommitRows(0, row_batch));
         end_of_stripe_ = true;
@@ -1390,12 +1371,8 @@ Status HdfsOrcScanner::PrepareSearchArguments() {
       VLOG_FILE << "Built search arguments for ORC file: " << filename() << ": "
           << final_sarg->toString() << ". File schema: " << reader_->getType().toString();
       row_reader_options_.searchArgument(std::move(final_sarg));
-    } catch (std::exception& e) {
-      string msg = Substitute("Encountered parse error during building search arguments "
-          "in ORC file $0: $1", filename(), e.what());
-      parse_status_ = Status(msg);
-      return parse_status_;
-    }
+    } RETURN_ON_ORC_EXCEPTION(
+        "Encountered parse error during building search arguments in ORC file $0: $1");
   }
   // Free any expr result allocations accumulated during conjunct evaluation.
   context_->expr_results_pool()->Clear();