You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2020/02/28 23:03:20 UTC

[impala] 01/03: IMPALA-9226: Improve string allocations of the ORC scanner

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c48efd407e7b857c6df0d167aafe02f93c81e2fb
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Mon Jan 13 16:19:41 2020 +0100

    IMPALA-9226: Improve string allocations of the ORC scanner
    
    Currently the OrcColumnReader copies values from the
    orc::StringVectorBatch one-by-one. Since ORC 1.6, the blob which
    contains the pointed values is moved to the StringVectorBatch,
    so we can copy it.
    
    This commit beside the above improvement also enables the
    LazyEncoding option for the ORC reader. This way, for stripes
    with DICTIONARY_ENCODING[_V2], EncodedStringVectorBatch contains
    the data in a dictionaryBlob from which the data can be acquired
    with the given indices and lengths.
    
    Tests:
     * Run ORC scanner tests (query_tests/test_scanners.py::TestOrc)
       and tpch query tests.
     * Tested performance on tpch.lineitem table with scale=25,
       running queries that selects min of string columns.
       Some results:
       col_name     | encoding | before | after | speedup
       =============================================================
       l_comment      DIRECT     16.42s   14.38s  14%
       l_shipinstruct DICTIONARY 5.26s    3.80s   32%
       l_commitdate   DICTIONARY 5.46s    5.19s   5%
       all string col BOTH       39.06s   32.18s  21%
    
       The queries were run on a desktop PC with MT_DOP and NUM_NODES
       set to 1.
     * Also run TPC-H queries on the TPC-H benchmark where some
       queries' runtime improved by around 10-15%, while there were
       no regression for the others.
    
    Change-Id: If2d975946fb6f4104d8dc98895285b3a0c6bef7f
    Reviewed-on: http://gerrit.cloudera.org:8080/15051
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc   | 21 ++++++++--
 be/src/exec/hdfs-orc-scanner.h    |  7 ++++
 be/src/exec/orc-column-readers.cc | 83 +++++++++++++++++++++++++++------------
 be/src/exec/orc-column-readers.h  | 66 ++++++++++++++++++++++++-------
 4 files changed, 135 insertions(+), 42 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index a38b345..1a607f3 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -134,6 +134,8 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
 
 HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsScanner(scan_node, state),
+    dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+    data_batch_pool_(new MemPool(scan_node->mem_tracker())),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
   assemble_rows_timer_.Stop();
 }
@@ -184,6 +186,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
   // columns we don't need.
   RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc()));
+  // By enabling lazy decoding, String stripes with DICTIONARY_ENCODING[_V2] can be
+  // stored in an EncodedStringVectorBatch, where the data is stored in a dictionary
+  // blob more efficiently.
+  row_reader_options_.setEnableLazyDecoding(true);
 
   // Build 'col_id_path_map_' that maps from ORC column ids to their corresponding
   // SchemaPath in the table. The map is used in the constructors of OrcColumnReaders
@@ -226,18 +232,24 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
   if (row_batch != nullptr) {
     context_->ReleaseCompletedResources(true);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
     if (scan_node_->HasRowBatchQueue()) {
       static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
           unique_ptr<RowBatch>(row_batch));
     }
   } else {
     template_tuple_pool_->FreeAll();
+    dictionary_pool_->FreeAll();
+    data_batch_pool_->FreeAll();
     context_->ReleaseCompletedResources(true);
   }
   orc_root_batch_.reset(nullptr);
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(data_batch_pool_->total_allocated_bytes(), 0);
 
   assemble_rows_timer_.Stop();
   assemble_rows_timer_.ReleaseCounter();
@@ -501,8 +513,10 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
   // to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
   // set to true in 'AssembleRows'.
   while (advance_stripe_ || end_of_stripe_) {
+    // The next stripe will use a new dictionary blob so transfer the memory to row_batch.
+    row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
     context_->ReleaseCompletedResources(/* done */ true);
-    // Commit the rows to flush the row batch from the previous stripe
+    // Commit the rows to flush the row batch from the previous stripe.
     RETURN_IF_ERROR(CommitRows(0, row_batch));
 
     RETURN_IF_ERROR(NextStripe());
@@ -617,7 +631,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   if (!continue_execution) return Status::CancelledInternal("ORC scanner");
 
   // We're going to free the previous batch. Clear the reference first.
-  orc_root_reader_->UpdateInputBatch(nullptr);
+  RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(nullptr));
 
   orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
   DCHECK_EQ(orc_root_batch_->numElements, 0);
@@ -625,9 +639,10 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   int64_t num_rows_read = 0;
   while (continue_execution) {  // one ORC batch (ColumnVectorBatch) in a round
     if (orc_root_reader_->EndOfBatch()) {
+      row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
       try {
         end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
-        orc_root_reader_->UpdateInputBatch(orc_root_batch_.get());
+        RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get()));
         if (end_of_stripe_) break; // no more data to process
       } catch (ResourceError& e) {
         parse_status_ = e.GetStatus();
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index d26ca88..a6ba523 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -170,6 +170,13 @@ class HdfsOrcScanner : public HdfsScanner {
   /// Mem pool used in orc readers.
   boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
 
+  /// Pool to copy dictionary buffer into.
+  /// This pool is shared across all the batches in a stripe.
+  boost::scoped_ptr<MemPool> dictionary_pool_;
+  /// Pool to copy non-dictionary buffer into. This pool is responsible for handling
+  /// vector batches that do not necessarily fit into one row batch.
+  boost::scoped_ptr<MemPool> data_batch_pool_;
+
   std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
 
   /// orc::Reader's responsibility is to read the footer and metadata from an ORC file.
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 6216d10..99608af 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -149,13 +149,45 @@ Status OrcBoolColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
   return Status::OK();
 }
 
+Status OrcStringColumnReader::InitBlob(orc::DataBuffer<char>* blob, MemPool* pool) {
+  // TODO: IMPALA-9310: Possible improvement is moving the buffer out from orc::DataBuffer
+  // instead of copying and let Impala free the memory later.
+  blob_ = reinterpret_cast<char*>(pool->TryAllocateUnaligned(blob->size()));
+  if (UNLIKELY(blob_ == nullptr)) {
+    string details = Substitute("Could not allocate string buffer of $0 bytes "
+        "for ORC file '$1'.", blob->size(), scanner_->filename());
+    return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
+        scanner_->state_, details, blob->size());
+  }
+  memcpy(blob_, blob->data(), blob->size());
+  return Status::OK();
+}
+
 Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
   if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
     SetNullSlot(tuple);
     return Status::OK();
   }
-  const char* src_ptr = batch_->data.data()[row_idx];
-  int64_t src_len = batch_->length.data()[row_idx];
+  char* src_ptr;
+  int src_len;
+
+  if (batch_->isEncoded) {
+    orc::EncodedStringVectorBatch* currentBatch =
+        static_cast<orc::EncodedStringVectorBatch*>(batch_);
+
+    orc::DataBuffer<int64_t>& offsets = currentBatch->dictionary->dictionaryOffset;
+    int64_t index = currentBatch->index[row_idx];
+    if (UNLIKELY(index < 0  || static_cast<uint64_t>(index) + 1 >= offsets.size())) {
+      return Status(Substitute("Corrupt ORC file: $0. Index ($1) out of range [0, $2) in "
+          "StringDictionary.", scanner_->filename(), index, offsets.size()));;
+    }
+    src_ptr = blob_ + offsets[index];
+    src_len = offsets[index + 1] - offsets[index];
+  } else {
+    // The pointed data is now in blob_, a buffer handled by Impala.
+    src_ptr = blob_ + (batch_->data[row_idx] - batch_->blob.data());
+    src_len = batch_->length[row_idx];
+  }
   int dst_len = slot_desc_->type().len;
   if (slot_desc_->type().type == TYPE_CHAR) {
     int unpadded_len = min(dst_len, static_cast<int>(src_len));
@@ -170,17 +202,7 @@ Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool
   } else {
     dst->len = src_len;
   }
-  // Space in the StringVectorBatch is allocated by scanner_->reader_mem_pool_. It will
-  // be reused at next batch, so we allocate a new space for this string.
-  uint8_t* buffer = pool->TryAllocateUnaligned(dst->len);
-  if (buffer == nullptr) {
-    string details = Substitute("Could not allocate string buffer of $0 bytes "
-        "for ORC file '$1'.", dst->len, scanner_->filename());
-    return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
-        scanner_->state_, details, dst->len);
-  }
-  dst->ptr = reinterpret_cast<char*>(buffer);
-  memcpy(dst->ptr, src_ptr, dst->len);
+  dst->ptr = src_ptr;
   return Status::OK();
 }
 
@@ -348,21 +370,24 @@ Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
   return Status::OK();
 }
 
-void OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::StructVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::StructVectorBatch*>(orc_batch));
   if (batch_ == nullptr || batch_->numElements == 0) {
     row_idx_ = 0;
-    for (OrcColumnReader* child : children_) child->UpdateInputBatch(nullptr);
-    return;
+    for (OrcColumnReader* child : children_) {
+      RETURN_IF_ERROR(child->UpdateInputBatch(nullptr));
+    }
+    return Status::OK();
   }
   row_idx_ = 0;
   int size = children_.size();
   for (int c = 0; c < size; ++c) {
-    children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]);
+    RETURN_IF_ERROR(children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]));
   }
+  return Status::OK();
 }
 
 Status OrcStructReader::TransferTuple(Tuple* tuple, MemPool* pool) {
@@ -447,17 +472,20 @@ OrcListReader::OrcListReader(const orc::Type* node, const SlotDescriptor* slot_d
       << (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
 }
 
-void OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::ListVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::ListVectorBatch*>(orc_batch));
   orc::ColumnVectorBatch* item_batch = batch_ ? batch_->elements.get() : nullptr;
-  for (OrcColumnReader* child : children_) child->UpdateInputBatch(item_batch);
+  for (OrcColumnReader* child : children_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(item_batch));
+  }
   if (batch_) {
     row_idx_ = -1;
     NextRow();
   }
+  return Status::OK();
 }
 
 int OrcListReader::GetNumTuples(int row_idx) const {
@@ -581,19 +609,24 @@ OrcMapReader::OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_des
   }
 }
 
-void OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::MapVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::MapVectorBatch*>(orc_batch));
   orc::ColumnVectorBatch* key_batch = batch_ ? batch_->keys.get() : nullptr;
   orc::ColumnVectorBatch* value_batch = batch_ ? batch_->elements.get() : nullptr;
-  for (OrcColumnReader* child : key_readers_) child->UpdateInputBatch(key_batch);
-  for (OrcColumnReader* child : value_readers_) child->UpdateInputBatch(value_batch);
+  for (OrcColumnReader* child : key_readers_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(key_batch));
+  }
+  for (OrcColumnReader* child : value_readers_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(value_batch));
+  }
   if (batch_) {
     row_idx_ = -1;
     NextRow();
   }
+  return Status::OK();
 }
 
 void OrcMapReader::NextRow() {
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 222b82d..d88c805 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -85,7 +85,8 @@ class OrcColumnReader {
   virtual bool IsCollectionReader() const { return false; }
 
   /// Update the orc batch we tracked. We'll read values from it.
-  virtual void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) = 0;
+  virtual Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch)
+      WARN_UNUSED_RESULT = 0;
 
   /// Read value at 'row_idx' of the ColumnVectorBatch into a slot of the given 'tuple'.
   /// Use 'pool' to allocate memory in need. Depends on the UpdateInputBatch being called
@@ -125,10 +126,11 @@ class OrcBoolColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
     DCHECK(batch_ == dynamic_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -143,9 +145,10 @@ class OrcIntColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     DCHECK(batch_ == static_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -169,9 +172,10 @@ class OrcDoubleColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::DoubleVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::DoubleVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -194,14 +198,43 @@ class OrcStringColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::StringVectorBatch*>(orc_batch);
-    DCHECK(batch_ == dynamic_cast<orc::StringVectorBatch*>(orc_batch));
+    if (orc_batch == nullptr) return Status::OK();
+    // We update the blob of a non-encoded batch every time, but since the dictionary blob
+    // is the same for the stripe, we only reset it for every new stripe.
+    // Note that this is possible since the encoding should be the same for every batch
+    // through the whole stripe.
+    if(!orc_batch->isEncoded) {
+      DCHECK(batch_ == dynamic_cast<orc::StringVectorBatch*>(orc_batch));
+      return InitBlob(&batch_->blob, scanner_->data_batch_pool_.get());
+    }
+    DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) ==
+        dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch));
+    if (last_stripe_idx_ != scanner_->stripe_idx_) {
+      last_stripe_idx_ = scanner_->stripe_idx_;
+      auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_);
+      return InitBlob(&current_batch->dictionary->dictionaryBlob,
+          scanner_->dictionary_pool_.get());
+    }
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
  private:
   orc::StringVectorBatch* batch_ = nullptr;
+  // We copy the blob from the batch, so the memory will be handled by Impala, and not
+  // by the ORC lib.
+  char* blob_ = nullptr;
+
+  // We cache the last stripe so we know when we have to update the blob (in case of
+  // dictionary encoding).
+  int last_stripe_idx_ = -1;
+
+  /// Initializes the blob if it has not been already in the current batch.
+  /// Unfortunately, this cannot be done in UpdateInputBatch, since we do not have
+  /// access to the pool there.
+  Status InitBlob(orc::DataBuffer<char>* blob, MemPool* pool);
 };
 
 class OrcTimestampReader : public OrcColumnReader {
@@ -210,9 +243,10 @@ class OrcTimestampReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::TimestampVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::TimestampVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -226,9 +260,10 @@ class OrcDateColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -243,10 +278,11 @@ class OrcDecimalColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     // Reminder: even decimal(1,1) is stored in int64 batch
     batch_ = static_cast<orc::Decimal64VectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::Decimal64VectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -269,9 +305,10 @@ class OrcDecimal16ColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::Decimal128VectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::Decimal128VectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -344,8 +381,9 @@ class OrcComplexColumnReader : public OrcColumnReader {
   /// Whether we've finished reading the current orc batch.
   bool EndOfBatch();
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     vbatch_ = orc_batch;
+    return Status::OK();
   }
 
   /// Assemble current collection value (tracked by 'row_idx_') into a top level 'tuple'.
@@ -385,7 +423,7 @@ class OrcStructReader : public OrcComplexColumnReader {
   OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
 
@@ -442,7 +480,7 @@ class OrcListReader : public OrcCollectionReader {
   OrcListReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
 
@@ -474,7 +512,7 @@ class OrcMapReader : public OrcCollectionReader {
   OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;