You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/04/20 13:34:44 UTC
[impala] 02/02: IMPALA-9469: ORC scanner vectorization for
collection types
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 248c6d2495d4628c10e6bdbb00f9ed170bba19b6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Apr 9 13:16:15 2020 +0200
IMPALA-9469: ORC scanner vectorization for collection types
This commit only keeps the batched read path of ORC columns, i.e.
from now on we always read ORC values into a scratch batch. Thanks to
this we also get codegen out of the box.
From now on materialization of the table-level tuples are always driven
by the root struct reader. This will enable us to implement row
validation (against a valid write id list) much easier. It's needed
for IMPALA-9512.
I eliminated the OrcComplexColumnReader::TransferTuple() interface
and the related codes. HdfsOrcScanner became simpler. Now it just calls
TopLevelReadValueBatch() on the root struct reader which tracks the
row index of the table-level tuples and calls ReadValueBatch on its
children accordingly. The children don't need to track the state
as they are always being told which row they need to read.
Testing:
* ran exhaustive tests
Performance:
* non-nested benchmark results stayed the same as expected
* Overall 1-2% gain on TPCH Nested, scale=1
** In some cases scanning was ~20% more efficient
Change-Id: I477961b427406035a04529c5175dbee8f8a93ad5
Reviewed-on: http://gerrit.cloudera.org:8080/15730
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/hdfs-orc-scanner.cc | 72 +++---------
be/src/exec/hdfs-orc-scanner.h | 6 +-
be/src/exec/orc-column-readers.cc | 174 +++++++++++++---------------
be/src/exec/orc-column-readers.h | 238 ++++++++++++++++++++------------------
4 files changed, 226 insertions(+), 264 deletions(-)
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 0b1a599..7967cb2 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -533,8 +533,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
// 'TransferTuples' here.
if (!orc_root_reader_->EndOfBatch()) {
assemble_rows_timer_.Start();
- RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch,
- !orc_root_reader_->HasCollectionChild()));
+ RETURN_IF_ERROR(TransferTuples(row_batch));
assemble_rows_timer_.Stop();
if (row_batch->AtCapacity()) return Status::OK();
DCHECK(orc_root_reader_->EndOfBatch());
@@ -694,8 +693,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
num_rows_read += orc_root_batch_->numElements;
}
- RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch,
- !orc_root_reader_->HasCollectionChild()));
+ RETURN_IF_ERROR(TransferTuples(row_batch));
if (row_batch->AtCapacity()) break;
continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
}
@@ -707,62 +705,26 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
return Status::OK();
}
-Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
- RowBatch* dst_batch, bool do_batch_read) {
- if (!coll_reader->MaterializeTuple()) {
- // Top-level readers that are not materializing tuples will delegate the
- // materialization to its unique child.
- DCHECK_EQ(coll_reader->children().size(), 1);
- OrcColumnReader* child = coll_reader->children()[0];
- // Only complex type readers can be top-level readers.
- DCHECK(child->IsComplexColumnReader());
- return TransferTuples(
- static_cast<OrcComplexColumnReader*>(child), dst_batch, do_batch_read);
- }
- const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
-
- ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_->data();
- int num_conjuncts = conjunct_evals_->size();
-
+Status HdfsOrcScanner::TransferTuples(RowBatch* dst_batch) {
DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
int row_id = dst_batch->num_rows();
int capacity = dst_batch->capacity();
- int num_to_commit = 0;
- TupleRow* dst_row = dst_batch->GetRow(row_id);
- Tuple* tuple = tuple_; // tuple_ is updated in CommitRows
-
- // TODO(IMPALA-6506): codegen the runtime filter + conjunct evaluation loop
- while (row_id < capacity && !coll_reader->EndOfBatch()) {
- if (do_batch_read) {
- DCHECK(scratch_batch_ != nullptr);
- DCHECK(scratch_batch_->AtEnd());
- RETURN_IF_ERROR(scratch_batch_->Reset(state_));
- InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem,
- scratch_batch_->capacity);
-
- RETURN_IF_ERROR(coll_reader->TopLevelReadValueBatch(scratch_batch_.get(),
- &scratch_batch_->aux_mem_pool));
- int num_tuples_transferred = TransferScratchTuples(dst_batch);
- row_id += num_tuples_transferred;
- num_to_commit += num_tuples_transferred;
- } else {
- if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
- InitTuple(tuple_desc, template_tuple_, tuple);
- RETURN_IF_ERROR(coll_reader->TransferTuple(tuple, dst_batch->tuple_data_pool()));
- dst_row->SetTuple(0, tuple);
- if (!EvalRuntimeFilters(dst_row)) continue;
- if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, dst_row)) {
- dst_row = next_row(dst_row);
- tuple = next_tuple(tuple_desc->byte_size(), tuple);
- ++row_id;
- ++num_to_commit;
- }
- }
+
+ while (row_id < capacity && !orc_root_reader_->EndOfBatch()) {
+ DCHECK(scratch_batch_ != nullptr);
+ DCHECK(scratch_batch_->AtEnd());
+ RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+ InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
+ RETURN_IF_ERROR(orc_root_reader_->TopLevelReadValueBatch(scratch_batch_.get(),
+ &scratch_batch_->aux_mem_pool));
+ int num_tuples_transferred = TransferScratchTuples(dst_batch);
+ row_id += num_tuples_transferred;
+ VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
+ num_tuples_transferred, dst_batch->num_rows());
+ RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch));
}
- VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
- num_to_commit, dst_batch->num_rows());
- return CommitRows(num_to_commit, dst_batch);
+ return Status::OK();
}
Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 7412c22..728faee 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -261,11 +261,7 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// filters and conjuncts (if any) against the tuples. Only surviving tuples are added
/// to the given batch. Returns if either 'orc_root_batch_' is drained or 'dst_batch'
/// is full.
- /// 'do_batch_read' indicates if this function should transfer rows in batches or in a
- /// row-by-row manner. Batch processing is not used if 'column_reader' has a collection
- /// amongst its children.
- Status TransferTuples(OrcComplexColumnReader* column_reader,
- RowBatch* dst_batch, bool do_batch_read) WARN_UNUSED_RESULT;
+ Status TransferTuples(RowBatch* dst_batch) WARN_UNUSED_RESULT;
/// Process the file footer and parse file_metadata_. This should be called with the
/// last FOOTER_SIZE bytes in context_.
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 872282e..72fe3f4 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -263,7 +263,7 @@ Status OrcDecimal16ColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* p
OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
- : OrcColumnReader(node, nullptr, scanner) {
+ : OrcBatchedReader(node, nullptr, scanner) {
SchemaPath& path = scanner->col_id_path_map_[node->getColumnId()];
if (path == table_tuple_desc->tuple_path()) tuple_desc_ = table_tuple_desc;
materialize_tuple_ = (tuple_desc_ != nullptr);
@@ -271,36 +271,12 @@ OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
<< ": tuple_desc_=" << (tuple_desc_ ? tuple_desc_->DebugString() : "null");
}
-bool OrcComplexColumnReader::EndOfBatch() {
+bool OrcStructReader::EndOfBatch() {
DCHECK(slot_desc_ == nullptr
&& (tuple_desc_ == nullptr || tuple_desc_ == scanner_->scan_node_->tuple_desc()))
<< "Should be top level reader when calling EndOfBatch()";
- if (!materialize_tuple_) {
- // If this reader is not materializing tuples, its 'row_idx_' is invalid and the
- // progress is tracked in the child. Delegate the judgement to the child recursively.
- DCHECK_EQ(children_.size(), 1);
- return static_cast<OrcComplexColumnReader*>(children_[0])->EndOfBatch();
- }
- DCHECK(vbatch_ == nullptr || row_idx_ <= vbatch_->numElements);
- return vbatch_ == nullptr || row_idx_ == vbatch_->numElements;
-}
-
-bool OrcComplexColumnReader::HasCollectionChild() const {
- return HasCollectionChildRecursive(this);
-}
-
-bool OrcComplexColumnReader::HasCollectionChildRecursive(
- const OrcColumnReader* reader) const {
- DCHECK(reader != nullptr);
- if (reader->IsCollectionReader()) return true;
- if (!reader->IsComplexColumnReader()) return false;
- const OrcComplexColumnReader* complex_reader =
- static_cast<const OrcComplexColumnReader*>(reader);
- DCHECK(complex_reader == dynamic_cast<const OrcComplexColumnReader*>(reader));
- for (OrcColumnReader* child : complex_reader->children_) {
- if (HasCollectionChildRecursive(child)) return true;
- }
- return false;
+ DCHECK(vbatch_ == nullptr || row_idx_ <= NumElements());
+ return vbatch_ == nullptr || row_idx_ == NumElements();
}
inline bool PathContains(const SchemaPath& path, const SchemaPath& sub_path) {
@@ -381,6 +357,11 @@ OrcStructReader::OrcStructReader(const orc::Type* node,
}
Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+ if (!MaterializeTuple()) {
+ DCHECK_EQ(1, children_.size());
+ OrcColumnReader* child = children_[0];
+ return child->ReadValue(row_idx, tuple, pool);
+ }
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
return Status::OK();
@@ -415,7 +396,6 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
Status OrcStructReader::ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
MemPool* pool, int scratch_batch_idx) {
for (OrcColumnReader* child : children_) {
- DCHECK(!child->IsCollectionReader());
RETURN_IF_ERROR(
child->ReadValueBatch(row_idx, scratch_batch, pool, scratch_batch_idx));
}
@@ -442,14 +422,6 @@ Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
return Status::OK();
}
-Status OrcStructReader::TransferTuple(Tuple* tuple, MemPool* pool) {
- for (OrcColumnReader* child : children_) {
- RETURN_IF_ERROR(child->ReadValue(row_idx_, tuple, pool));
- }
- ++row_idx_;
- return Status::OK();
-}
-
OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
@@ -464,7 +436,7 @@ OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
}
}
-Status OrcCollectionReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(vbatch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
@@ -476,15 +448,69 @@ Status OrcCollectionReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
return scanner_->AssembleCollection(*this, row_idx, &builder);
}
-Status OrcCollectionReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
- MemPool* pool) {
- DCHECK(false);
+int OrcListReader::NumElements() const {
+ if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
+ if (children_.empty()) {
+ return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
+ }
+ return children_[0]->NumElements();
+}
+
+Status OrcListReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+ if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
+
+ for (OrcColumnReader* child : children_) {
+ RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
+ }
+ if (pos_slot_desc_ != nullptr) {
+ RETURN_IF_ERROR(SetPositionSlot(row_idx, tuple));
+ }
return Status::OK();
}
-Status OrcCollectionReader::ReadValueBatch(int row_idx,
- ScratchTupleBatch* scratch_batch, MemPool* pool, int scratch_batch_idx) {
- DCHECK(false);
+Status OrcMapReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
+ if (DirectReader()) return AssembleCollection(row_idx, tuple, pool);
+
+ for (OrcColumnReader* child : children_) {
+ RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
+ }
+ return Status::OK();
+}
+
+Status OrcListReader::SetPositionSlot(int row_idx, Tuple* tuple) {
+ DCHECK(pos_slot_desc_ != nullptr);
+
+ int64_t pos = -1;
+ DCHECK_LT(list_idx_, batch_->numElements);
+ if (list_idx_ == batch_->numElements - 1 ||
+ (batch_->offsets[list_idx_] <= row_idx && row_idx < batch_->offsets[list_idx_+1])) {
+ // We are somewhere in the current list.
+ pos = row_idx - batch_->offsets[list_idx_];
+ } else if (row_idx == batch_->offsets[list_idx_+1]) {
+ // Let's move to the next list.
+ pos = 0;
+ list_idx_ += 1;
+ }
+ else if (row_idx > batch_->offsets[list_idx_+1]) {
+ // We lagged behind. Let's find our list.
+ for (int i = list_idx_; i < batch_->numElements; ++i) {
+ if (row_idx < batch_->offsets[i+1]) {
+ pos = row_idx - batch_->offsets[i];
+ list_idx_ = i;
+ break;
+ }
+ }
+ }
+ if (pos < 0) {
+ // Oops, something went wrong. It can be caused by a corrupt file, so let's raise
+ // an error.
+ return Status(Substitute(
+ "ORC list indexes and elements are inconsistent in file $0",
+ scanner_->filename()));
+ }
+ int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(tuple->GetSlot(
+ pos_slot_desc_->tuple_offset()));
+ *slot_val_ptr = pos;
return Status::OK();
}
@@ -545,10 +571,7 @@ Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->UpdateInputBatch(item_batch));
}
- if (batch_) {
- row_idx_ = -1;
- NextRow();
- }
+ list_idx_ = 0;
return Status::OK();
}
@@ -562,20 +585,6 @@ int OrcListReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
-Status OrcListReader::TransferTuple(Tuple* tuple, MemPool* pool) {
- if (pos_slot_desc_) {
- int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(
- tuple->GetSlot(pos_slot_desc_->tuple_offset()));
- *slot_val_ptr = array_idx_;
- }
- for (OrcColumnReader* child : children_) {
- RETURN_IF_ERROR(child->ReadValue(array_start_ + array_idx_, tuple, pool));
- }
- array_idx_++;
- if (array_start_ + array_idx_ >= array_end_) NextRow();
- return Status::OK();
-}
-
Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
@@ -591,16 +600,6 @@ Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple
return Status::OK();
}
-void OrcListReader::NextRow() {
- do {
- ++row_idx_;
- if (row_idx_ >= batch_->numElements) break;
- array_start_ = batch_->offsets[row_idx_];
- array_end_ = batch_->offsets[row_idx_ + 1];
- } while (IsNull(batch_, row_idx_) || array_start_ == array_end_);
- array_idx_ = 0;
-}
-
void OrcMapReader::CreateChildForSlot(const orc::Type* node,
const SlotDescriptor* slot_desc) {
const SchemaPath& path = scanner_->col_id_path_map_[node->getColumnId()];
@@ -686,22 +685,9 @@ Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
for (OrcColumnReader* child : value_readers_) {
RETURN_IF_ERROR(child->UpdateInputBatch(value_batch));
}
- if (batch_) {
- row_idx_ = -1;
- NextRow();
- }
return Status::OK();
}
-void OrcMapReader::NextRow() {
- do {
- ++row_idx_;
- if (row_idx_ >= batch_->numElements) break;
- array_offset_ = batch_->offsets[row_idx_];
- array_end_ = batch_->offsets[row_idx_ + 1];
- } while (IsNull(batch_, row_idx_) || array_offset_ == array_end_);
-}
-
int OrcMapReader::GetNumTuples(int row_idx) const {
if (IsNull(batch_, row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
@@ -712,15 +698,6 @@ int OrcMapReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
-Status OrcMapReader::TransferTuple(Tuple* tuple, MemPool* pool) {
- for (OrcColumnReader* child : children_) {
- RETURN_IF_ERROR(child->ReadValue(array_offset_, tuple, pool));
- }
- array_offset_++;
- if (array_offset_ >= array_end_) NextRow();
- return Status::OK();
-}
-
Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
@@ -730,4 +707,13 @@ Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
}
return Status::OK();
}
+
+int OrcMapReader::NumElements() const {
+ if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
+ if (children_.empty()) {
+ return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
+ }
+ return children_[0]->NumElements();
}
+
+} // namespace impala
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 0e6a0fc..001b913 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -47,9 +47,9 @@ class HdfsOrcScanner;
/// }
/// }
///
-/// For complex types readers, they can be top-level readers (readers materializing
-/// table level tuples), so we need more interface to deal with table/collection level
-/// tuple materialization. See more in the class comments of OrcComplexColumnReader.
+/// The root reader is always an OrcStructReader, it drives the materialization of the
+/// table level tuples, so we need more interface to deal with table/collection level
+/// tuple materialization. See more in the class comments of OrcStructReader.
class OrcColumnReader {
public:
/// Create a column reader for the given 'slot_desc' based on the ORC 'node'. We say
@@ -65,7 +65,7 @@ class OrcColumnReader {
static OrcColumnReader* Create(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
- /// Base constructor for all types of readers that hold a SlotDescriptor (non top-level
+ /// Base constructor for all types of readers that hold a SlotDescriptor (direct
/// readers). Primitive column readers will materialize values into the slot. STRUCT
/// column readers will delegate the slot materialization to its children. Collection
/// column (ARRAY/MAP) readers will create CollectionValue in the slot and assemble
@@ -101,8 +101,23 @@ class OrcColumnReader {
virtual Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
MemPool* pool, int scratch_batch_idx) WARN_UNUSED_RESULT = 0;
+ /// Returns the number of tuples this reader directly or indirectry writes. E.g. if it
+ /// is a primitive reader it returns 'batch_->numElements'. If it is a collection
+ /// reader then it depends on whether it writes the tuple directly or indirectly. If it
+ /// writes the tuple directly, i.e. it creates a collection value, then it also returns
+ /// 'batch_->numElements' which is the number of collection values (lists, maps) it
+ /// writes. On the other hand, if it writes the tuple indirectly, i.e. it delegates the
+ /// writing to its child then it returns child->NumElements().
+ /// E.g. if there's a column named 'arr', and its type is array<array<array<int>>>, and
+ /// the query is
+ /// "SELECT item from t.arr.item.item"
+ /// then NumElements() for the top OrcListReader returns the number of elements in the
+ /// OrcIntColumnReader in the bottom.
+ virtual int NumElements() const = 0;
+
protected:
friend class OrcStructReader;
+ friend class OrcCollectionReader;
/// Convenient field for debug. We can't keep the pointer of orc::Type since they'll be
/// destroyed after orc::RowReader was released. Only keep the id orc::Type here.
@@ -128,47 +143,64 @@ class OrcColumnReader {
}
};
-/// The main purpose of this class other than providing implementations relevant only for
-/// primitive type column readers is to implement static polymorphism via the "curiously
-/// recurring template pattern". All the derived classes are expected to provide
-/// themselves as the template parameter of this class. As a result the number of virtual
+/// The main purpose of this class is to implement static polymorphism via the "curiously
+/// recurring template pattern". All the derived classes are expected to provide a
+/// subclass as the template parameter of this class. As a result the number of virtual
/// function calls can be reduced in ReadValueBatch() as we can directly call non-virtual
/// ReadValue() of the derived class.
-template<class T>
-class OrcPrimitiveColumnReader : public OrcColumnReader {
+template<class Final>
+class OrcBatchedReader : public OrcColumnReader {
public:
- OrcPrimitiveColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+ OrcBatchedReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) {}
- virtual ~OrcPrimitiveColumnReader() { }
-
- bool IsComplexColumnReader() const override { return false; }
-
- bool IsCollectionReader() const override { return false; }
-
- bool MaterializeTuple() const override { return true; }
-
Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
int scratch_batch_idx) override WARN_UNUSED_RESULT {
- T* derived = static_cast<T*>(this);
+ Final* final = this->GetFinal();
int num_to_read = std::min<int>(scratch_batch->capacity - scratch_batch_idx,
- derived->batch_->numElements - row_idx);
- DCHECK_LE(row_idx + num_to_read, derived->batch_->numElements);
+ final->NumElements() - row_idx);
+ DCHECK_LE(row_idx + num_to_read, final->NumElements());
for (int i = 0; i < num_to_read; ++i) {
int scratch_batch_pos = i + scratch_batch_idx;
uint8_t* next_tuple = scratch_batch->tuple_mem +
scratch_batch_pos * OrcColumnReader::scanner_->tuple_byte_size();
Tuple* tuple = reinterpret_cast<Tuple*>(next_tuple);
-
- // Make sure that each ReadValue() is final in each derived class. This way
- // devirtualization helps to reduce the number of virtual function calls, and as a
- // result to improve performance.
- RETURN_IF_ERROR(derived->ReadValue(row_idx + i, tuple, pool));
+ // The compiler will devirtualize the call to ReadValue() if it is marked 'final' in
+ // the 'Final' class. This way we can reduce the number of virtual function calls
+ // to improve performance.
+ RETURN_IF_ERROR(final->ReadValue(row_idx + i, tuple, pool));
}
scratch_batch->num_tuples = scratch_batch_idx + num_to_read;
return Status::OK();
}
+
+ Final* GetFinal() { return static_cast<Final*>(this); }
+ const Final* GetFinal() const { return static_cast<const Final*>(this); }
+};
+
+/// Base class for primitive types. It implements the common functions with 'final'
+/// annotation. Template parameter 'Final' holds a concrete primitive column reader and is
+/// propagated to OrcBatchedReader.
+template<class Final>
+class OrcPrimitiveColumnReader : public OrcBatchedReader<Final> {
+ public:
+ OrcPrimitiveColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
+ HdfsOrcScanner* scanner)
+ : OrcBatchedReader<Final>(node, slot_desc, scanner) {}
+ virtual ~OrcPrimitiveColumnReader() {}
+
+ bool IsComplexColumnReader() const final { return false; }
+
+ bool IsCollectionReader() const final { return false; }
+
+ bool MaterializeTuple() const final { return true; }
+
+ int NumElements() const final {
+ const Final* final = this->GetFinal();
+ if (final->batch_ == nullptr) return 0;
+ return final->batch_->numElements;
+ }
};
class OrcBoolColumnReader : public OrcPrimitiveColumnReader<OrcBoolColumnReader> {
@@ -413,56 +445,33 @@ class OrcDecimal16ColumnReader
/// sub queries). The root reader is always an OrcStructReader since the root of the ORC
/// schema is represented as a STRUCT type.
///
-/// Only OrcComplexColumnReaders can be top-level readers: readers that control the
-/// materialization of the top-level tuples, whether directly or indirectly (by its
-/// unique child).
+/// For collection readers, they can be divided into two kinds by whether they should
+/// materialize collection tuples (reflected by materialize_tuple_). (STRUCTs always
+/// delegate materialization to their children.)
///
-/// There're only one top-level reader that directly materializes top-level(table-level)
-/// tuples: the reader whose orc_node matches the tuple_path of the top-level
-/// TupleDescriptor. For the only top-level reader that directly materializes top-level
-/// tuples, the usage of the interfaces follows the pattern:
-/// while ( /* has new batch in the stripe */ ) {
-/// reader->UpdateInputBatch(orc_batch);
-/// while (!reader->EndOfBatch()) {
-/// tuple = ... // Init tuple
-/// reader->ReadValueBatch(scratch_batch, mem_pool);
-/// }
-/// }
-/// 'ReadValueBatch' don't require a row index since the top-level reader will keep
-/// track of the progress by internal fields:
-/// * STRUCT reader: row_idx_
-/// * LIST reader: row_idx_, array_start_, array_idx_, array_end_
-/// * MAP reader: row_idx_, array_offset_, array_end_
-///
-/// For top-level readers that indirectly materializes tuples, they are ancestors of the
-/// above reader. Such kind of readers just UpdateInputBatch (so update children's
-/// recursively) and then delegate the materialization to their children. (See more in
-/// HdfsOrcScanner::TransferTuples)
-///
-/// For non top-level readers, they can be divided into two kinds by whether they should
-/// materialize collection tuples (reflected by materialize_tuple_). STRUCT is not a
-/// collection type so non top-level STRUCT readers always have materialize_tuple_ being
-/// false as default.
-///
-/// For non top-level collection type readers, they create a CollectionValue and a
+/// For collection type readers that materialize a CollectionValue they create a
/// CollectionValueBuilder when 'ReadValue' is called. Then recursively delegate the
/// materialization of collection tuples to the child that matches the TupleDescriptor.
/// This child tracks the boundary of current collection and call 'ReadChildrenValue' to
-/// assemble collection tuples. (See more in HdfsOrcScanner::AssembleCollection)
+/// assemble collection tuples. (See more in HdfsOrcScanner::AssembleCollection).
+/// If they don't materialize collection values then they just delegate the reading to
+/// their children.
///
/// Children readers are created in the constructor recursively.
-class OrcComplexColumnReader : public OrcColumnReader {
+class OrcComplexColumnReader : public OrcBatchedReader<OrcComplexColumnReader> {
public:
static OrcComplexColumnReader* CreateTopLevelReader(const orc::Type* node,
const TupleDescriptor* tuple_desc, HdfsOrcScanner* scanner);
- /// Constructor for top-level readers
+ /// Constructor for indirect readers, they delegate reads to their children.
OrcComplexColumnReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner);
- /// Constructor for non top-level readers
+ /// Constructor for readers that write to a single slot of the given tuple. However,
+ /// this single slot might be a CollectionValue, in which case it materializes new
+ /// tuples for it which will be filled by its children.
OrcComplexColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
- HdfsOrcScanner* scanner) : OrcColumnReader(node, slot_desc, scanner) { }
+ HdfsOrcScanner* scanner) : OrcBatchedReader(node, slot_desc, scanner) { }
virtual ~OrcComplexColumnReader() { }
@@ -470,24 +479,11 @@ class OrcComplexColumnReader : public OrcColumnReader {
bool MaterializeTuple() const override { return materialize_tuple_; }
- /// Whether we've finished reading the current orc batch.
- bool EndOfBatch();
-
Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
vbatch_ = orc_batch;
return Status::OK();
}
- /// Checks if this complex column reader has a collection child.
- bool HasCollectionChild() const;
-
- /// Assemble current collection value (tracked by 'row_idx_') into a top level 'tuple'.
- /// Depends on the UpdateInputBatch being called before (thus batch_ is updated)
- virtual Status TransferTuple(Tuple* tuple, MemPool* pool) WARN_UNUSED_RESULT = 0;
-
- virtual Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool)
- WARN_UNUSED_RESULT = 0;
-
/// Num of tuples inside the 'row_idx'-th row. LIST/MAP types will have 0 to N tuples.
/// STRUCT type will always have one tuple.
virtual int GetNumTuples(int row_idx) const = 0;
@@ -497,6 +493,10 @@ class OrcComplexColumnReader : public OrcColumnReader {
virtual int GetChildBatchOffset(int row_idx) const = 0;
const vector<OrcColumnReader*>& children() const { return children_; }
+
+ /// Returns true if this column reader writes to a slot directly. Returns false if it
+ /// delegates its job to its children. It might still write a position slot though.
+ bool DirectReader() const { return slot_desc_ != nullptr; }
protected:
vector<OrcColumnReader*> children_;
@@ -505,17 +505,26 @@ class OrcComplexColumnReader : public OrcColumnReader {
bool materialize_tuple_ = false;
- /// Keep row index if we're top level readers
- int row_idx_;
-
/// Convenient reference to 'batch_' of subclass.
orc::ColumnVectorBatch* vbatch_ = nullptr;
-
- /// Helper function for HasCollectionChild() to achieve recursion on the children
- /// tree of 'reader'.
- bool HasCollectionChildRecursive(const OrcColumnReader* reader) const;
};
+/// Struct readers control the reading of struct fields.
+/// The ORC library always return a struct reader to read the contents of a file. That's
+/// why our root reader is always an OrcStructReader. The root reader controls the
+/// materialization of the top-level tuples.
+/// It provides an interface that is convenient to use by the scanner.
+/// The usage pattern is more or less the following:
+/// while ( /* has new batch in the stripe */ ) {
+/// reader->UpdateInputBatch(orc_batch);
+/// while (!reader->EndOfBatch()) {
+/// InitScratchTuples();
+/// reader->TopLevelReadValueBatch(scratch_batch, mem_pool);
+/// TransferScratchTuples();
+/// }
+/// }
+/// 'TopLevelReadValueBatch' don't require a row index since the root reader will keep
+/// track of the row index.
class OrcStructReader : public OrcComplexColumnReader {
public:
/// Constructor for top level reader
@@ -531,25 +540,36 @@ class OrcStructReader : public OrcComplexColumnReader {
Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
- Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
-
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
- Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool) override
+ Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool)
WARN_UNUSED_RESULT;
Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
int scratch_batch_idx) override WARN_UNUSED_RESULT;
+ /// Whether we've finished reading the current orc batch.
+ bool EndOfBatch();
+
int GetNumTuples(int row_idx) const override { return 1; }
int GetChildBatchOffset(int row_idx) const override { return row_idx; }
+
+ int NumElements() const final {
+ if (MaterializeTuple()) return vbatch_->numElements;
+ DCHECK_EQ(children().size(), 1);
+ OrcColumnReader* child = children()[0];
+ return child->NumElements();
+ }
private:
orc::StructVectorBatch* batch_ = nullptr;
/// Field ids of the children reader
std::vector<int> children_fields_;
+ /// Keep row index if we're top level readers
+ int row_idx_;
+
void SetNullSlot(Tuple* tuple) override {
for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
}
@@ -576,20 +596,22 @@ class OrcCollectionReader : public OrcComplexColumnReader {
bool IsCollectionReader() const override { return true; }
- Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
-
- Status TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool) override
- WARN_UNUSED_RESULT;
-
- Status ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch, MemPool* pool,
- int scratch_batch_idx) override WARN_UNUSED_RESULT;
-
/// Assemble the given 'tuple' by reading children values into it. The corresponding
/// children values are in the 'row_idx'-th collection. Each collection (List/Map) may
/// have variable number of tuples, we only read children values of the 'tuple_idx'-th
/// tuple.
virtual Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const WARN_UNUSED_RESULT = 0;
+ protected:
+ void SetNullSlot(Tuple* tuple) override {
+ if (slot_desc_ != nullptr) {
+ OrcColumnReader::SetNullSlot(tuple);
+ return;
+ }
+ for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
+ }
+
+ Status AssembleCollection(int row_idx, Tuple* tuple, MemPool* pool) WARN_UNUSED_RESULT;
};
class OrcListReader : public OrcCollectionReader {
@@ -604,7 +626,7 @@ class OrcListReader : public OrcCollectionReader {
Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
- Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
+ Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
int GetNumTuples(int row_idx) const override;
@@ -612,18 +634,18 @@ class OrcListReader : public OrcCollectionReader {
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
+
+ virtual int NumElements() const final;
private:
+ Status SetPositionSlot(int row_idx, Tuple* tuple);
+
orc::ListVectorBatch* batch_ = nullptr;
const SlotDescriptor* pos_slot_desc_ = nullptr;
- int array_start_ = -1;
- int array_idx_ = -1;
- int array_end_ = -1;
+ /// Keeps track the list we are reading. It's needed for calculation the position
+ /// value.
+ int list_idx_ = 0;
void CreateChildForSlot(const orc::Type* node, const SlotDescriptor* slot_desc);
-
- /// Used for top level readers. Advance current position (row_idx_ and array_idx_)
- /// to the first tuple inside next row.
- void NextRow();
};
class OrcMapReader : public OrcCollectionReader {
@@ -638,7 +660,7 @@ class OrcMapReader : public OrcCollectionReader {
Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
- Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
+ Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
int GetNumTuples(int row_idx) const override;
@@ -647,17 +669,13 @@ class OrcMapReader : public OrcCollectionReader {
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
+ virtual int NumElements() const final;
private:
orc::MapVectorBatch* batch_ = nullptr;
vector<OrcColumnReader*> key_readers_;
vector<OrcColumnReader*> value_readers_;
- int array_offset_ = -1;
- int array_end_ = -1;
void CreateChildForSlot(const orc::Type* orc_type, const SlotDescriptor* slot_desc);
-
- /// Used for top level readers. Advance current position (row_idx_ and array_offset_)
- /// to the first key/value pair in next row.
- void NextRow();
};
-}
+
+} // namespace impala