You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/04/20 13:34:44 UTC

[impala] 02/02: IMPALA-9469: ORC scanner vectorization for collection types

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 248c6d2495d4628c10e6bdbb00f9ed170bba19b6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Apr 9 13:16:15 2020 +0200

    IMPALA-9469: ORC scanner vectorization for collection types
    
    This commit only keeps the batched read path of ORC columns, i.e.
    from now on we always read ORC values into a scratch batch. Thanks to
    this we also get codegen out of the box.
    
    From now on materialization of the table-level tuples are always driven
    by the root struct reader. This will enable us to implement row
    validation (against a valid write id list) much easier. It's needed
    for IMPALA-9512.
    
    I eliminated the OrcComplexColumnReader::TransferTuple() interface
    and the related codes. HdfsOrcScanner became simpler. Now it just calls
    TopLevelReadValueBatch() on the root struct reader which tracks the
    row index of the table-level tuples and calls ReadValueBatch on its
    children accordingly. The children don't need to track the state
    as they are always being told which row they need to read.
    
    Testing:
     * ran exhaustive tests
    
    Performance:
     * non-nested benchmark results stayed the same as expected
     * Overall 1-2% gain on TPCH Nested, scale=1
     ** In some cases scanning was ~20% more efficient
    
    Change-Id: I477961b427406035a04529c5175dbee8f8a93ad5
    Reviewed-on: http://gerrit.cloudera.org:8080/15730
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc   |  72 +++---------
 be/src/exec/hdfs-orc-scanner.h    |   6 +-
 be/src/exec/orc-column-readers.cc | 174 +++++++++++++---------------
 be/src/exec/orc-column-readers.h  | 238 ++++++++++++++++++++------------------
 4 files changed, 226 insertions(+), 264 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 0b1a599..7967cb2 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -533,8 +533,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
   // 'TransferTuples' here.
   if (!orc_root_reader_->EndOfBatch()) {
     assemble_rows_timer_.Start();
-    RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch,
-        !orc_root_reader_->HasCollectionChild()));
+    RETURN_IF_ERROR(TransferTuples(row_batch));
     assemble_rows_timer_.Stop();
     if (row_batch->AtCapacity()) return Status::OK();
     DCHECK(orc_root_reader_->EndOfBatch());
@@ -694,8 +693,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
       num_rows_read += orc_root_batch_->numElements;
     }
 
-    RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch,
-       !orc_root_reader_->HasCollectionChild()));
+    RETURN_IF_ERROR(TransferTuples(row_batch));
     if (row_batch->AtCapacity()) break;
     continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
   }
@@ -707,62 +705,26 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   return Status::OK();
 }
 
-Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
-    RowBatch* dst_batch, bool do_batch_read) {
-  if (!coll_reader->MaterializeTuple()) {
-    // Top-level readers that are not materializing tuples will delegate the
-    // materialization to its unique child.
-    DCHECK_EQ(coll_reader->children().size(), 1);
-    OrcColumnReader* child = coll_reader->children()[0];
-    // Only complex type readers can be top-level readers.
-    DCHECK(child->IsComplexColumnReader());
-    return TransferTuples(
-        static_cast<OrcComplexColumnReader*>(child), dst_batch, do_batch_read);
-  }
-  const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
-
-  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_->data();
-  int num_conjuncts = conjunct_evals_->size();
-
+Status HdfsOrcScanner::TransferTuples(RowBatch* dst_batch) {
   DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
   if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
   int row_id = dst_batch->num_rows();
   int capacity = dst_batch->capacity();
-  int num_to_commit = 0;
-  TupleRow* dst_row = dst_batch->GetRow(row_id);
-  Tuple* tuple = tuple_;  // tuple_ is updated in CommitRows
-
-  // TODO(IMPALA-6506): codegen the runtime filter + conjunct evaluation loop
-  while (row_id < capacity && !coll_reader->EndOfBatch()) {
-    if (do_batch_read) {
-      DCHECK(scratch_batch_ != nullptr);
-      DCHECK(scratch_batch_->AtEnd());
-      RETURN_IF_ERROR(scratch_batch_->Reset(state_));
-      InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem,
-          scratch_batch_->capacity);
-
-      RETURN_IF_ERROR(coll_reader->TopLevelReadValueBatch(scratch_batch_.get(),
-          &scratch_batch_->aux_mem_pool));
-      int num_tuples_transferred = TransferScratchTuples(dst_batch);
-      row_id += num_tuples_transferred;
-      num_to_commit += num_tuples_transferred;
-    } else {
-      if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
-      InitTuple(tuple_desc, template_tuple_, tuple);
-      RETURN_IF_ERROR(coll_reader->TransferTuple(tuple, dst_batch->tuple_data_pool()));
-      dst_row->SetTuple(0, tuple);
-      if (!EvalRuntimeFilters(dst_row)) continue;
-      if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, dst_row)) {
-        dst_row = next_row(dst_row);
-        tuple = next_tuple(tuple_desc->byte_size(), tuple);
-        ++row_id;
-        ++num_to_commit;
-      }
-    }
+
+  while (row_id < capacity && !orc_root_reader_->EndOfBatch()) {
+    DCHECK(scratch_batch_ != nullptr);
+    DCHECK(scratch_batch_->AtEnd());
+    RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+    InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
+    RETURN_IF_ERROR(orc_root_reader_->TopLevelReadValueBatch(scratch_batch_.get(),
+        &scratch_batch_->aux_mem_pool));
+    int num_tuples_transferred = TransferScratchTuples(dst_batch);
+    row_id += num_tuples_transferred;
+    VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
+        num_tuples_transferred, dst_batch->num_rows());
+    RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch));
   }
-  VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
-      num_to_commit, dst_batch->num_rows());
-  return CommitRows(num_to_commit, dst_batch);
+  return Status::OK();
 }
 
 Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 7412c22..728faee 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -261,11 +261,7 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// filters and conjuncts (if any) against the tuples. Only surviving tuples are added
   /// to the given batch. Returns if either 'orc_root_batch_' is drained or 'dst_batch'
   /// is full.
-  /// 'do_batch_read' indicates if this function should transfer rows in batches or in a
-  /// row-by-row manner. Batch processing is not used if 'column_reader' has a collection
-  /// amongst its children.
-  Status TransferTuples(OrcComplexColumnReader* column_reader,
-      RowBatch* dst_batch, bool do_batch_read) WARN_UNUSED_RESULT;
+  Status TransferTuples(RowBatch* dst_batch) WARN_UNUSED_RESULT;
 
   /// Process the file footer and parse file_metadata_.  This should be called with the
   /// last FOOTER_SIZE bytes in context_.
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 872282e..72fe3f4 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -263,7 +263,7 @@ Status OrcDecimal16ColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* p
 
 OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
     const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
-    : OrcColumnReader(node, nullptr, scanner) {
+    : OrcBatchedReader(node, nullptr, scanner) {
   SchemaPath& path = scanner->col_id_path_map_[node->getColumnId()];
   if (path == table_tuple_desc->tuple_path()) tuple_desc_ = table_tuple_desc;
   materialize_tuple_ = (tuple_desc_ != nullptr);
@@ -271,36 +271,12 @@ OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
       << ": tuple_desc_=" << (tuple_desc_ ? tuple_desc_->DebugString() : "null");
 }
 
-bool OrcComplexColumnReader::EndOfBatch() {
+bool OrcStructReader::EndOfBatch() {
   DCHECK(slot_desc_ == nullptr
       && (tuple_desc_ == nullptr || tuple_desc_ == scanner_->scan_node_->tuple_desc()))
       << "Should be top level reader when calling EndOfBatch()";
-  if (!materialize_tuple_) {
-    // If this reader is not materializing tuples, its 'row_idx_' is invalid and the
-    // progress is tracked in the child. Delegate the judgement to the child recursively.
-    DCHECK_EQ(children_.size(), 1);
-    return static_cast<OrcComplexColumnReader*>(children_[0])->EndOfBatch();
-  }
-  DCHECK(vbatch_ == nullptr || row_idx_ <= vbatch_->numElements);
-  return vbatch_ == nullptr || row_idx_ == vbatch_->numElements;
-}
-
-bool OrcComplexColumnReader::HasCollectionChild() const {
-  return HasCollectionChildRecursive(this);
-}
-
-bool OrcComplexColumnReader::HasCollectionChildRecursive(
-    const OrcColumnReader* reader) const {
-  DCHECK(reader != nullptr);
-  if (reader->IsCollectionReader()) return true;
-  if (!reader->IsComplexColumnReader()) return false;
-  const OrcComplexColumnReader* complex_reader =
-      static_cast<const OrcComplexColumnReader*>(reader);
-  DCHECK(complex_reader == dynamic_cast<const OrcComplexColumnReader*>(reader));
-  for (OrcColumnReader* child : complex_reader->children_) {
-    if (HasCollectionChildRecursive(child)) return true;
-  }
-  return false;
+  DCHECK(vbatch_ == nullptr || row_idx_ <= NumElements());
+  return vbatch_ == nullptr || row_idx_ == NumElements();
 }
 
 inline bool PathContains(const SchemaPath& path, const SchemaPath& sub_path) {
@@ -381,6 +357,11 @@ OrcStructReader::OrcStructReader(const orc::Type* node,
 }
 
 Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+  if (!MaterializeTuple()) {
+    DCHECK_EQ(1, children_.size());
+    OrcColumnReader* child = children_[0];
+    return child->ReadValue(row_idx, tuple, pool);
+  }
   if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
     for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
     return Status::OK();
@@ -415,7 +396,6 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
 Status OrcStructReader::ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
     MemPool* pool, int scratch_batch_idx) {
   for (OrcColumnReader* child : children_) {
-    DCHECK(!child->IsCollectionReader());
     RETURN_IF_ERROR(
         child->ReadValueBatch(row_idx, scratch_batch, pool, scratch_batch_idx));
   }
@@ -442,14 +422,6 @@ Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
   return Status::OK();
 }
 
-Status OrcStructReader::TransferTuple(Tuple* tuple, MemPool* pool) {
-  for (OrcColumnReader* child : children_) {
-    RETURN_IF_ERROR(child->ReadValue(row_idx_, tuple, pool));
-  }
-  ++row_idx_;
-  return Status::OK();
-}
-
 OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
     const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
     : OrcComplexColumnReader(node, slot_desc, scanner) {
@@ -464,7 +436,7 @@ OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
   }
 }
 
-Status OrcCollectionReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPool* pool) {
   if (IsNull(DCHECK_NOTNULL(vbatch_), row_idx)) {
     SetNullSlot(tuple);
     return Status::OK();
@@ -476,15 +448,69 @@ Status OrcCollectionReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
   return scanner_->AssembleCollection(*this, row_idx, &builder);
 }
 
-Status OrcCollectionReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
-    MemPool* pool) {
-  DCHECK(false);
+int OrcListReader::NumElements() const {
+  if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
+  if (children_.empty()) {
+    return batch_ != nullptr ?  batch_->offsets[batch_->numElements] : 0;
+  }
+  return children_[0]->NumElements();
+}
+
+Status OrcListReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+  if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
+
+  for (OrcColumnReader* child : children_) {
+    RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
+  }
+  if (pos_slot_desc_ != nullptr) {
+    RETURN_IF_ERROR(SetPositionSlot(row_idx, tuple));
+  }
   return Status::OK();
 }
 
-Status OrcCollectionReader::ReadValueBatch(int row_idx,
-    ScratchTupleBatch* scratch_batch, MemPool* pool, int scratch_batch_idx) {
-  DCHECK(false);
+Status OrcMapReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+  if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
+
+  for (OrcColumnReader* child : children_) {
+    RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
+  }
+  return Status::OK();
+}
+
+Status OrcListReader::SetPositionSlot(int row_idx, Tuple* tuple) {
+  DCHECK(pos_slot_desc_ != nullptr);
+
+  int64_t pos = -1;
+  DCHECK_LT(list_idx_, batch_->numElements);
+  if (list_idx_ == batch_->numElements - 1 ||
+      (batch_->offsets[list_idx_] <= row_idx && row_idx < batch_->offsets[list_idx_+1])) {
+    // We are somewhere in the current list.
+    pos = row_idx - batch_->offsets[list_idx_];
+  } else if (row_idx == batch_->offsets[list_idx_+1]) {
+    // Let's move to the next list.
+    pos = 0;
+    list_idx_ += 1;
+  }
+  else if (row_idx > batch_->offsets[list_idx_+1]) {
+    // We lagged behind. Let's find our list.
+    for (int i = list_idx_; i < batch_->numElements; ++i) {
+      if (row_idx < batch_->offsets[i+1]) {
+        pos = row_idx - batch_->offsets[i];
+        list_idx_ = i;
+        break;
+      }
+    }
+  }
+  if (pos < 0) {
+      // Oops, something went wrong. It can be caused by a corrupt file, so let's raise
+      // an error.
+      return Status(Substitute(
+          "ORC list indexes and elements are inconsistent in file $0",
+          scanner_->filename()));
+  }
+  int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(tuple->GetSlot(
+      pos_slot_desc_->tuple_offset()));
+  *slot_val_ptr = pos;
   return Status::OK();
 }
 
@@ -545,10 +571,7 @@ Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
   for (OrcColumnReader* child : children_) {
     RETURN_IF_ERROR(child->UpdateInputBatch(item_batch));
   }
-  if (batch_) {
-    row_idx_ = -1;
-    NextRow();
-  }
+  list_idx_ = 0;
   return Status::OK();
 }
 
@@ -562,20 +585,6 @@ int OrcListReader::GetChildBatchOffset(int row_idx) const {
   return batch_->offsets[row_idx];
 }
 
-Status OrcListReader::TransferTuple(Tuple* tuple, MemPool* pool) {
-  if (pos_slot_desc_) {
-    int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(
-        tuple->GetSlot(pos_slot_desc_->tuple_offset()));
-    *slot_val_ptr = array_idx_;
-  }
-  for (OrcColumnReader* child : children_) {
-    RETURN_IF_ERROR(child->ReadValue(array_start_ + array_idx_, tuple, pool));
-  }
-  array_idx_++;
-  if (array_start_ + array_idx_ >= array_end_) NextRow();
-  return Status::OK();
-}
-
 Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
     MemPool* pool) const {
   DCHECK_LT(row_idx, batch_->numElements);
@@ -591,16 +600,6 @@ Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple
   return Status::OK();
 }
 
-void OrcListReader::NextRow() {
-  do {
-    ++row_idx_;
-    if (row_idx_ >= batch_->numElements) break;
-    array_start_ = batch_->offsets[row_idx_];
-    array_end_ = batch_->offsets[row_idx_ + 1];
-  } while (IsNull(batch_, row_idx_) || array_start_ == array_end_);
-  array_idx_ = 0;
-}
-
 void OrcMapReader::CreateChildForSlot(const orc::Type* node,
     const SlotDescriptor* slot_desc) {
   const SchemaPath& path = scanner_->col_id_path_map_[node->getColumnId()];
@@ -686,22 +685,9 @@ Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
   for (OrcColumnReader* child : value_readers_) {
     RETURN_IF_ERROR(child->UpdateInputBatch(value_batch));
   }
-  if (batch_) {
-    row_idx_ = -1;
-    NextRow();
-  }
   return Status::OK();
 }
 
-void OrcMapReader::NextRow() {
-  do {
-    ++row_idx_;
-    if (row_idx_ >= batch_->numElements) break;
-    array_offset_ = batch_->offsets[row_idx_];
-    array_end_ = batch_->offsets[row_idx_ + 1];
-  } while (IsNull(batch_, row_idx_) || array_offset_ == array_end_);
-}
-
 int OrcMapReader::GetNumTuples(int row_idx) const {
   if (IsNull(batch_, row_idx)) return 0;
   DCHECK_GT(batch_->offsets.size(), row_idx + 1);
@@ -712,15 +698,6 @@ int OrcMapReader::GetChildBatchOffset(int row_idx) const {
   return batch_->offsets[row_idx];
 }
 
-Status OrcMapReader::TransferTuple(Tuple* tuple, MemPool* pool) {
-  for (OrcColumnReader* child : children_) {
-    RETURN_IF_ERROR(child->ReadValue(array_offset_, tuple, pool));
-  }
-  array_offset_++;
-  if (array_offset_ >= array_end_) NextRow();
-  return Status::OK();
-}
-
 Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
     MemPool* pool) const {
   DCHECK_LT(row_idx, batch_->numElements);
@@ -730,4 +707,13 @@ Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
   }
   return Status::OK();
 }
+
+int OrcMapReader::NumElements() const {
+  if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
+  if (children_.empty()) {
+    return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
+  }
+  return children_[0]->NumElements();
 }
+
+} // namespace impala
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 0e6a0fc..001b913 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -47,9 +47,9 @@ class HdfsOrcScanner;
 ///     }
 ///   }
 ///
-/// For complex types readers, they can be top-level readers (readers materializing
-/// table level tuples), so we need more interface to deal with table/collection level
-/// tuple materialization. See more in the class comments of OrcComplexColumnReader.
+/// The root reader is always an OrcStructReader, it drives the materialization of the
+/// table level tuples, so we need more interface to deal with table/collection level
+/// tuple materialization. See more in the class comments of OrcStructReader.
 class OrcColumnReader {
  public:
   /// Create a column reader for the given 'slot_desc' based on the ORC 'node'. We say
@@ -65,7 +65,7 @@ class OrcColumnReader {
   static OrcColumnReader* Create(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  /// Base constructor for all types of readers that hold a SlotDescriptor (non top-level
+  /// Base constructor for all types of readers that hold a SlotDescriptor (direct
   /// readers). Primitive column readers will materialize values into the slot. STRUCT
   /// column readers will delegate the slot materialization to its children. Collection
   /// column (ARRAY/MAP) readers will create CollectionValue in the slot and assemble
@@ -101,8 +101,23 @@ class OrcColumnReader {
   virtual Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
       MemPool* pool, int scratch_batch_idx) WARN_UNUSED_RESULT = 0;
 
+  /// Returns the number of tuples this reader directly or indirectry writes. E.g. if it
+  /// is a primitive reader it returns 'batch_->numElements'. If it is a collection
+  /// reader then it depends on whether it writes the tuple directly or indirectly. If it
+  /// writes the tuple directly, i.e. it creates a collection value, then it also returns
+  /// 'batch_->numElements' which is the number of collection values (lists, maps) it
+  /// writes. On the other hand, if it writes the tuple indirectly, i.e. it delegates the
+  /// writing to its child then it returns child->NumElements().
+  /// E.g. if there's a column named 'arr', and its type is array<array<array<int>>>, and
+  /// the query is
+  /// "SELECT item from t.arr.item.item"
+  /// then NumElements() for the top OrcListReader returns the number of elements in the
+  /// OrcIntColumnReader in the bottom.
+  virtual int NumElements() const = 0;
+
  protected:
   friend class OrcStructReader;
+  friend class OrcCollectionReader;
 
   /// Convenient field for debug. We can't keep the pointer of orc::Type since they'll be
   /// destroyed after orc::RowReader was released. Only keep the id orc::Type here.
@@ -128,47 +143,64 @@ class OrcColumnReader {
   }
 };
 
-/// The main purpose of this class other than providing implementations relevant only for
-/// primitive type column readers is to implement static polymorphism via the "curiously
-/// recurring template pattern". All the derived classes are expected to provide
-/// themselves as the template parameter of this class. As a result the number of virtual
+/// The main purpose of this class is to implement static polymorphism via the "curiously
+/// recurring template pattern". All the derived classes are expected to provide a
+/// subclass as the template parameter of this class. As a result the number of virtual
 /// function calls can be reduced in ReadValueBatch() as we can directly call non-virtual
 /// ReadValue() of the derived class.
-template<class T>
-class OrcPrimitiveColumnReader : public OrcColumnReader {
+template<class Final>
+class OrcBatchedReader : public OrcColumnReader {
  public:
-  OrcPrimitiveColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+  OrcBatchedReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) {}
 
-  virtual ~OrcPrimitiveColumnReader() { }
-
-  bool IsComplexColumnReader() const override { return false; }
-
-  bool IsCollectionReader() const override { return false; }
-
-  bool MaterializeTuple() const override { return true; }
-
   Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
       int scratch_batch_idx) override WARN_UNUSED_RESULT {
-    T* derived = static_cast<T*>(this);
+    Final* final = this->GetFinal();
     int num_to_read = std::min<int>(scratch_batch->capacity - scratch_batch_idx,
-        derived->batch_->numElements - row_idx);
-    DCHECK_LE(row_idx + num_to_read, derived->batch_->numElements);
+        final->NumElements() - row_idx);
+    DCHECK_LE(row_idx + num_to_read, final->NumElements());
     for (int i = 0; i < num_to_read; ++i) {
       int scratch_batch_pos = i + scratch_batch_idx;
       uint8_t* next_tuple = scratch_batch->tuple_mem +
           scratch_batch_pos * OrcColumnReader::scanner_->tuple_byte_size();
       Tuple* tuple = reinterpret_cast<Tuple*>(next_tuple);
-
-      // Make sure that each ReadValue() is final in each derived class. This way
-      // devirtualization helps to reduce the number of virtual function calls, and as a
-      // result to improve performance.
-      RETURN_IF_ERROR(derived->ReadValue(row_idx + i, tuple, pool));
+      // The compiler will devirtualize the call to ReadValue() if it is marked 'final' in
+      // the 'Final' class. This way we can reduce the number of virtual function calls
+      // to improve performance.
+      RETURN_IF_ERROR(final->ReadValue(row_idx + i, tuple, pool));
     }
     scratch_batch->num_tuples = scratch_batch_idx + num_to_read;
     return Status::OK();
   }
+
+  Final* GetFinal() { return static_cast<Final*>(this); }
+  const Final* GetFinal() const { return static_cast<const Final*>(this); }
+};
+
+/// Base class for primitive types. It implements the common functions with 'final'
+/// annotation. Template parameter 'Final' holds a concrete primitive column reader and is
+/// propagated to OrcBatchedReader.
+template<class Final>
+class OrcPrimitiveColumnReader : public OrcBatchedReader<Final> {
+ public:
+  OrcPrimitiveColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+      HdfsOrcScanner* scanner)
+      : OrcBatchedReader<Final>(node, slot_desc, scanner) {}
+  virtual ~OrcPrimitiveColumnReader() {}
+
+  bool IsComplexColumnReader() const final { return false; }
+
+  bool IsCollectionReader() const final { return false; }
+
+  bool MaterializeTuple() const final { return true; }
+
+  int NumElements() const final {
+    const Final* final = this->GetFinal();
+    if (final->batch_ == nullptr) return 0;
+    return final->batch_->numElements;
+  }
 };
 
 class OrcBoolColumnReader : public OrcPrimitiveColumnReader<OrcBoolColumnReader> {
@@ -413,56 +445,33 @@ class OrcDecimal16ColumnReader
 /// sub queries). The root reader is always an OrcStructReader since the root of the ORC
 /// schema is represented as a STRUCT type.
 ///
-/// Only OrcComplexColumnReaders can be top-level readers: readers that control the
-/// materialization of the top-level tuples, whether directly or indirectly (by its
-/// unique child).
+/// For collection readers, they can be divided into two kinds by whether they should
+/// materialize collection tuples (reflected by materialize_tuple_). (STRUCTs always
+/// delegate materialization to their children.)
 ///
-/// There're only one top-level reader that directly materializes top-level(table-level)
-/// tuples: the reader whose orc_node matches the tuple_path of the top-level
-/// TupleDescriptor. For the only top-level reader that directly materializes top-level
-/// tuples, the usage of the interfaces follows the pattern:
-///   while ( /* has new batch in the stripe */ ) {
-///     reader->UpdateInputBatch(orc_batch);
-///     while (!reader->EndOfBatch()) {
-///       tuple = ...  // Init tuple
-///       reader->ReadValueBatch(scratch_batch, mem_pool);
-///     }
-///   }
-/// 'ReadValueBatch' don't require a row index since the top-level reader will keep
-/// track of the progress by internal fields:
-///   * STRUCT reader: row_idx_
-///   * LIST reader: row_idx_, array_start_, array_idx_, array_end_
-///   * MAP reader: row_idx_, array_offset_, array_end_
-///
-/// For top-level readers that indirectly materializes tuples, they are ancestors of the
-/// above reader. Such kind of readers just UpdateInputBatch (so update children's
-/// recursively) and then delegate the materialization to their children. (See more in
-/// HdfsOrcScanner::TransferTuples)
-///
-/// For non top-level readers, they can be divided into two kinds by whether they should
-/// materialize collection tuples (reflected by materialize_tuple_). STRUCT is not a
-/// collection type so non top-level STRUCT readers always have materialize_tuple_ being
-/// false as default.
-///
-/// For non top-level collection type readers, they create a CollectionValue and a
+/// For collection type readers that materialize a CollectionValue they create a
 /// CollectionValueBuilder when 'ReadValue' is called. Then recursively delegate the
 /// materialization of collection tuples to the child that matches the TupleDescriptor.
 /// This child tracks the boundary of current collection and call 'ReadChildrenValue' to
-/// assemble collection tuples. (See more in HdfsOrcScanner::AssembleCollection)
+/// assemble collection tuples. (See more in HdfsOrcScanner::AssembleCollection).
+/// If they don't materialize collection values then they just delegate the reading to
+/// their children.
 ///
 /// Children readers are created in the constructor recursively.
-class OrcComplexColumnReader : public OrcColumnReader {
+class OrcComplexColumnReader : public OrcBatchedReader<OrcComplexColumnReader> {
  public:
   static OrcComplexColumnReader* CreateTopLevelReader(const orc::Type* node,
       const TupleDescriptor* tuple_desc, HdfsOrcScanner* scanner);
 
-  /// Constructor for top-level readers
+  /// Constructor for indirect readers, they delegate reads to their children.
   OrcComplexColumnReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
       HdfsOrcScanner* scanner);
 
-  /// Constructor for non top-level readers
+  /// Constructor for readers that write to a single slot of the given tuple. However,
+  /// this single slot might be a CollectionValue, in which case it materializes new
+  /// tuples for it which will be filled by its children.
   OrcComplexColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
-      HdfsOrcScanner* scanner) : OrcColumnReader(node, slot_desc, scanner) { }
+      HdfsOrcScanner* scanner) : OrcBatchedReader(node, slot_desc, scanner) { }
 
   virtual ~OrcComplexColumnReader() { }
 
@@ -470,24 +479,11 @@ class OrcComplexColumnReader : public OrcColumnReader {
 
   bool MaterializeTuple() const override { return materialize_tuple_; }
 
-  /// Whether we've finished reading the current orc batch.
-  bool EndOfBatch();
-
   Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     vbatch_ = orc_batch;
     return Status::OK();
   }
 
-  /// Checks if this complex column reader has a collection child.
-  bool HasCollectionChild() const;
-
-  /// Assemble current collection value (tracked by 'row_idx_') into a top level 'tuple'.
-  /// Depends on the UpdateInputBatch being called before (thus batch_ is updated)
-  virtual Status TransferTuple(Tuple* tuple, MemPool* pool) WARN_UNUSED_RESULT = 0;
-
-  virtual Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool)
-      WARN_UNUSED_RESULT = 0;
-
   /// Num of tuples inside the 'row_idx'-th row. LIST/MAP types will have 0 to N tuples.
   /// STRUCT type will always have one tuple.
   virtual int GetNumTuples(int row_idx) const = 0;
@@ -497,6 +493,10 @@ class OrcComplexColumnReader : public OrcColumnReader {
   virtual int GetChildBatchOffset(int row_idx) const = 0;
 
   const vector<OrcColumnReader*>& children() const { return children_; }
+
+  /// Returns true if this column reader writes to a slot directly. Returns false if it
+  /// delegates its job to its children. It might still write a position slot though.
+  bool DirectReader() const { return slot_desc_ != nullptr; }
  protected:
   vector<OrcColumnReader*> children_;
 
@@ -505,17 +505,26 @@ class OrcComplexColumnReader : public OrcColumnReader {
 
   bool materialize_tuple_ = false;
 
-  /// Keep row index if we're top level readers
-  int row_idx_;
-
   /// Convenient reference to 'batch_' of subclass.
   orc::ColumnVectorBatch* vbatch_ = nullptr;
-
-  /// Helper function for HasCollectionChild() to achieve recursion on the children
-  /// tree of 'reader'.
-  bool HasCollectionChildRecursive(const OrcColumnReader* reader) const;
 };
 
+/// Struct readers control the reading of struct fields.
+/// The ORC library always return a struct reader to read the contents of a file. That's
+/// why our root reader is always an OrcStructReader. The root reader controls the
+/// materialization of the top-level tuples.
+/// It provides an interface that is convenient to use by the scanner.
+/// The usage pattern is more or less the following:
+///   while ( /* has new batch in the stripe */ ) {
+///     reader->UpdateInputBatch(orc_batch);
+///     while (!reader->EndOfBatch()) {
+///       InitScratchTuples();
+///       reader->TopLevelReadValueBatch(scratch_batch, mem_pool);
+///       TransferScratchTuples();
+///     }
+///   }
+/// 'TopLevelReadValueBatch' don't require a row index since the root reader will keep
+/// track of the row index.
 class OrcStructReader : public OrcComplexColumnReader {
  public:
   /// Constructor for top level reader
@@ -531,25 +540,36 @@ class OrcStructReader : public OrcComplexColumnReader {
 
   Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
-  Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
-
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
 
-  Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool) override
+  Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool)
       WARN_UNUSED_RESULT;
 
   Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
       int scratch_batch_idx) override WARN_UNUSED_RESULT;
 
+  /// Whether we've finished reading the current orc batch.
+  bool EndOfBatch();
+
   int GetNumTuples(int row_idx) const override { return 1; }
 
   int GetChildBatchOffset(int row_idx) const override { return row_idx; }
+
+  int NumElements() const final {
+    if (MaterializeTuple()) return vbatch_->numElements;
+    DCHECK_EQ(children().size(), 1);
+    OrcColumnReader* child = children()[0];
+    return child->NumElements();
+  }
  private:
   orc::StructVectorBatch* batch_ = nullptr;
 
   /// Field ids of the children reader
   std::vector<int> children_fields_;
 
+  /// Keep row index if we're top level readers
+  int row_idx_;
+
   void SetNullSlot(Tuple* tuple) override {
     for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
   }
@@ -576,20 +596,22 @@ class OrcCollectionReader : public OrcComplexColumnReader {
 
   bool IsCollectionReader() const override { return true; }
 
-  Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
-
-  Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool) override
-      WARN_UNUSED_RESULT;
-
-  Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
-      int scratch_batch_idx) override WARN_UNUSED_RESULT;
-
   /// Assemble the given 'tuple' by reading children values into it. The corresponding
   /// children values are in the 'row_idx'-th collection. Each collection (List/Map) may
   /// have variable number of tuples, we only read children values of the 'tuple_idx'-th
   /// tuple.
   virtual Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
       MemPool* pool) const WARN_UNUSED_RESULT = 0;
+ protected:
+  void SetNullSlot(Tuple* tuple) override {
+    if (slot_desc_ != nullptr) {
+      OrcColumnReader::SetNullSlot(tuple);
+      return;
+    }
+    for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
+  }
+
+  Status AssembleCollection(int row_idx, Tuple* tuple, MemPool* pool) WARN_UNUSED_RESULT;
 };
 
 class OrcListReader : public OrcCollectionReader {
@@ -604,7 +626,7 @@ class OrcListReader : public OrcCollectionReader {
 
   Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
-  Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
+  Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
 
   int GetNumTuples(int row_idx) const override;
 
@@ -612,18 +634,18 @@ class OrcListReader : public OrcCollectionReader {
 
   Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
       const override WARN_UNUSED_RESULT;
+
+  virtual int NumElements() const final;
  private:
+  Status SetPositionSlot(int row_idx, Tuple* tuple);
+
   orc::ListVectorBatch* batch_ = nullptr;
   const SlotDescriptor* pos_slot_desc_ = nullptr;
-  int array_start_ = -1;
-  int array_idx_ = -1;
-  int array_end_ = -1;
+  /// Keeps track the list we are reading. It's needed for calculation the position
+  /// value.
+  int list_idx_ = 0;
 
   void CreateChildForSlot(const orc::Type* node, const SlotDescriptor* slot_desc);
-
-  /// Used for top level readers. Advance current position (row_idx_ and array_idx_)
-  /// to the first tuple inside next row.
-  void NextRow();
 };
 
 class OrcMapReader : public OrcCollectionReader {
@@ -638,7 +660,7 @@ class OrcMapReader : public OrcCollectionReader {
 
   Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
-  Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
+  Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
 
   int GetNumTuples(int row_idx) const override;
 
@@ -647,17 +669,13 @@ class OrcMapReader : public OrcCollectionReader {
   Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
       const override WARN_UNUSED_RESULT;
 
+  virtual int NumElements() const final;
  private:
   orc::MapVectorBatch* batch_ = nullptr;
   vector<OrcColumnReader*> key_readers_;
   vector<OrcColumnReader*> value_readers_;
-  int array_offset_ = -1;
-  int array_end_ = -1;
 
   void CreateChildForSlot(const orc::Type* orc_type, const SlotDescriptor* slot_desc);
-
-  /// Used for top level readers. Advance current position (row_idx_ and array_offset_)
-  /// to the first key/value pair in next row.
-  void NextRow();
 };
-}
+
+} // namespace impala