You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/04 15:17:04 UTC
[7/8] impala git commit: IMPALA-4123 (prep): Parquet column reader
cleanup
IMPALA-4123 (prep): Parquet column reader cleanup
Some miscellaneous cleanup to make it easier to understand and
make future changes to the Parquet scanner.
A lot of the refactoring is about more cleanly separating functions
so that they have clearer purpose, e.g.:
* Functions that strictly do decoding, i.e. materialize values, convert
and validate them. These are changed to operate on single values, not tuples.
* Functions that are used for the non-batched decoding path (i.e. driven
by CollectionColumnReader or BoolColumnReader).
* Functions that dispatch to a templated implementation based on one or
more runtime values.
Other misc changes:
* Move large functions out of class bodies.
* Use parquet::Encoding instead of bool to indicate encoding.
* Add some additional DCHECKs.
Testing:
* Ran exhaustive tests
* Ran fuzz test in a loop
Change-Id: Ibc00352df3a0b2d605f872ae7e43db2dc90faab1
Reviewed-on: http://gerrit.cloudera.org:8080/9799
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/13604bd4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/13604bd4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/13604bd4
Branch: refs/heads/2.x
Commit: 13604bd4e1103559f6644828329f4bd3b6a55d54
Parents: 9d36d38
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Oct 11 23:58:19 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 3 19:59:26 2018 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 10 +-
be/src/exec/parquet-column-readers.cc | 833 ++++++++++++++++-------------
be/src/exec/parquet-column-readers.h | 25 +-
be/src/runtime/tuple.h | 8 +
be/src/util/bit-stream-utils.h | 2 +
be/src/util/rle-encoding.h | 2 +
6 files changed, 482 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b40816d..8cec3d2 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -362,8 +362,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
InitTuple(template_tuple_, dst_tuple);
- int64_t* dst_slot = reinterpret_cast<int64_t*>(dst_tuple->GetSlot(
- scan_node_->parquet_count_star_slot_offset()));
+ int64_t* dst_slot =
+ dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
*dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
row_group_rows_read_ += *dst_slot;
dst_row->SetTuple(0, dst_tuple);
@@ -1142,7 +1142,11 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
FILE_CHECK_GE(col_reader->def_level(),
col_reader->def_level_of_immediate_repeated_ancestor());
// Fill in position slot if applicable
- if (col_reader->pos_slot_desc() != nullptr) col_reader->ReadPosition(tuple);
+ const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
+ if (pos_slot_desc != nullptr) {
+ col_reader->ReadPositionNonBatched(
+ tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
+ }
continue_execution = col_reader->ReadValue(pool, tuple);
} else {
// A containing repeated field is empty or NULL
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 101f690..f403ce6 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -75,14 +75,18 @@ static int debug_count = 0;
using namespace impala::io;
+using parquet::Encoding;
+
namespace impala {
const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
"ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
Status ParquetLevelDecoder::Init(const string& filename,
- parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+ Encoding::type encoding, MemPool* cache_pool, int cache_size,
int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+ DCHECK(*data != nullptr);
+ DCHECK_GE(*data_size, 0);
DCHECK_GE(num_buffered_values, 0);
DCHECK_GT(cache_size, 0);
cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
@@ -97,7 +101,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
int32_t num_bytes = 0;
switch (encoding) {
- case parquet::Encoding::RLE: {
+ case Encoding::RLE: {
Status status;
if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
return status;
@@ -136,15 +140,15 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
num_cached_levels_ = 0;
cached_level_idx_ = 0;
// Memory has already been allocated.
- if (cached_levels_ != NULL) {
+ if (cached_levels_ != nullptr) {
DCHECK_EQ(cache_size_, cache_size);
return Status::OK();
}
cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
- if (cached_levels_ == NULL) {
+ if (cached_levels_ == nullptr) {
return pool->mem_tracker()->MemLimitExceeded(
- NULL, "Definition level cache", cache_size);
+ nullptr, "Definition level cache", cache_size);
}
memset(cached_levels_, 0, cache_size);
cache_size_ = cache_size;
@@ -211,98 +215,41 @@ bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
/// If MATERIALIZED is true, the column values are materialized into the slot described
/// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
/// the position can be accessed.
-template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
class ScalarColumnReader : public BaseScalarColumnReader {
public:
ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
- const SlotDescriptor* slot_desc)
- : BaseScalarColumnReader(parent, node, slot_desc),
- dict_decoder_(parent->scan_node_->mem_tracker()),
- dict_decoder_init_(false),
- needs_conversion_(false) {
- if (!MATERIALIZED) {
- // We're not materializing any values, just counting them. No need (or ability) to
- // initialize state used to materialize values.
- DCHECK(slot_desc_ == NULL);
- return;
- }
-
- DCHECK(slot_desc_ != NULL);
- DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
- if (slot_desc_->type().type == TYPE_DECIMAL
- && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
- fixed_len_size_ = node.element->type_length;
- } else if (slot_desc_->type().type == TYPE_VARCHAR) {
- fixed_len_size_ = slot_desc_->type().len;
- } else {
- fixed_len_size_ = -1;
- }
- needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
- // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
- // values. Currently all versions have converted values.
- (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
- slot_desc_->type().type == TYPE_TIMESTAMP &&
- parent->file_version_.application == "parquet-mr");
- }
-
+ const SlotDescriptor* slot_desc);
virtual ~ScalarColumnReader() { }
- virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
- return ReadValue<true>(pool, tuple);
+ virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+ return ReadValue<true>(tuple);
}
- virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
- return ReadValue<false>(pool, tuple);
+ virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+ return ReadValue<false>(tuple);
}
virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
- uint8_t* tuple_mem, int* num_values) {
- return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
+ uint8_t* tuple_mem, int* num_values) override {
+ return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
}
virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
- uint8_t* tuple_mem, int* num_values) {
- return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
+ uint8_t* tuple_mem, int* num_values) override {
+ return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values);
}
- virtual DictDecoderBase* GetDictionaryDecoder() {
+ virtual DictDecoderBase* GetDictionaryDecoder() override {
return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
}
- virtual bool NeedsConversion() { return NeedsConversionInline(); }
- virtual bool NeedsValidation() { return NeedsValidationInline(); }
+ virtual bool NeedsConversion() override { return NeedsConversionInline(); }
+ virtual bool NeedsValidation() override { return NeedsValidationInline(); }
protected:
- template<bool IN_COLLECTION>
- inline bool ReadValue(MemPool* pool, Tuple* tuple) {
- // NextLevels() should have already been called and def and rep levels should be in
- // valid range.
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a value";
-
- if (MATERIALIZED) {
- if (def_level_ >= max_def_level()) {
- if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
- if (NeedsConversionInline()) {
- if (!ReadSlot<true, true>(tuple, pool)) return false;
- } else {
- if (!ReadSlot<true, false>(tuple, pool)) return false;
- }
- } else {
- if (NeedsConversionInline()) {
- if (!ReadSlot<false, true>(tuple, pool)) return false;
- } else {
- if (!ReadSlot<false, false>(tuple, pool)) return false;
- }
- }
- } else {
- tuple->SetNull(null_indicator_offset_);
- }
- }
- return NextLevels<IN_COLLECTION>();
- }
+ template <bool IN_COLLECTION>
+ inline bool ReadValue(Tuple* tuple);
/// Implementation of the ReadValueBatch() functions specialized for this
/// column reader type. This function drives the reading of data pages and
@@ -315,74 +262,14 @@ class ScalarColumnReader : public BaseScalarColumnReader {
/// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
/// -fno-strict-alias).
template <bool IN_COLLECTION>
- bool ReadValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
- uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
- // Repetition level is only present if this column is nested in a collection type.
- if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
- if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
-
- int val_count = 0;
- bool continue_execution = true;
- while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
- DCHECK_GE(num_buffered_values_, 0);
- // Read next page if necessary.
- if (num_buffered_values_ == 0) {
- if (!NextPage()) {
- continue_execution = parent_->parse_status_.ok();
- continue;
- }
- }
-
- // Not materializing anything - skip decoding any levels and rely on the value
- // count from page metadata to return the correct number of rows.
- if (!MATERIALIZED && !IN_COLLECTION) {
- int vals_to_add = min(num_buffered_values_, max_values - val_count);
- val_count += vals_to_add;
- num_buffered_values_ -= vals_to_add;
- continue;
- }
- // Fill the rep level cache if needed. We are flattening out the fields of the
- // nested collection into the top-level tuple returned by the scan, so we don't
- // care about the nesting structure unless the position slot is being populated.
- if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
- parent_->parse_status_.MergeStatus(
- rep_levels_.CacheNextBatch(num_buffered_values_));
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- }
-
- // Fill def level cache if needed.
- if (!def_levels_.CacheHasNext()) {
- // TODO: add a fast path here if there's a run of repeated values.
- parent_->parse_status_.MergeStatus(
- def_levels_.CacheNextBatch(num_buffered_values_));
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- }
-
- // Read data page and cached levels to materialize values.
- int cache_start_idx = def_levels_.CacheCurrIdx();
- uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
- int remaining_val_capacity = max_values - val_count;
- int ret_val_count = 0;
- if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
- continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
- pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
- } else {
- continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
- pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
- }
- val_count += ret_val_count;
- num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
- if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= ColReaderDebugAction(&val_count);
- }
- }
- *num_values = val_count;
- return continue_execution;
- }
+ bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT;
/// Helper function for ReadValueBatch() above that performs value materialization.
/// It assumes a data page with remaining values is available, and that the def/rep
- /// level caches have been populated.
+ /// level caches have been populated. Materializes values into 'tuple_mem' with a
+ /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of
+ /// values materialized in 'num_values'.
/// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
/// handled in this function.
/// Use RESTRICT so that the compiler knows that it is safe to cache member
@@ -390,68 +277,18 @@ class ScalarColumnReader : public BaseScalarColumnReader {
/// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
/// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
/// -fno-strict-alias).
- template <bool IN_COLLECTION, bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
- bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
- uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
- DCHECK(MATERIALIZED || IN_COLLECTION);
- DCHECK_GT(num_buffered_values_, 0);
- DCHECK(def_levels_.CacheHasNext());
- if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext());
-
- uint8_t* curr_tuple = tuple_mem;
- int val_count = 0;
- while (def_levels_.CacheHasNext() && val_count < max_values) {
- Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
- int def_level = def_levels_.CacheGetNext();
-
- if (IN_COLLECTION) {
- if (def_level < def_level_of_immediate_repeated_ancestor()) {
- // A containing repeated field is empty or NULL. Skip the value but
- // move to the next repetition level if necessary.
- if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
- continue;
- }
- if (pos_slot_desc_ != NULL) {
- int rep_level = rep_levels_.CacheGetNext();
- // Reset position counter if we are at the start of a new parent collection.
- if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
- void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
- *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
- }
- }
+ template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+ bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT;
- if (MATERIALIZED) {
- if (def_level >= max_def_level()) {
- bool continue_execution =
- ReadSlot<IS_DICT_ENCODED, NEEDS_CONVERSION>(tuple, pool);
- if (UNLIKELY(!continue_execution)) return false;
- } else {
- tuple->SetNull(null_indicator_offset_);
- }
- }
- curr_tuple += tuple_size;
- ++val_count;
- }
- *num_values = val_count;
- return true;
- }
-
- // Dispatch to the correct templated implementation of MaterializeValueBatch based
- // on NeedsConversionInline().
- template <bool IN_COLLECTION, bool IS_DICT_ENCODED>
- bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
- uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
- if (NeedsConversionInline()) {
- return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, true>(
- pool, max_values, tuple_size, tuple_mem, num_values);
- } else {
- return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, false>(
- pool, max_values, tuple_size, tuple_mem, num_values);
- }
- }
+ /// Same as above, but dispatches to the appropriate templated implementation of
+ /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline().
+ template <bool IN_COLLECTION>
+ bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT;
virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
- DictDecoderBase** decoder) {
+ DictDecoderBase** decoder) override {
if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
slot_desc_->type().DebugString(), "could not decode dictionary");
@@ -461,78 +298,36 @@ class ScalarColumnReader : public BaseScalarColumnReader {
return Status::OK();
}
- virtual bool HasDictionaryDecoder() {
+ virtual bool HasDictionaryDecoder() override {
return dict_decoder_init_;
}
- virtual void ClearDictionaryDecoder() {
+ virtual void ClearDictionaryDecoder() override {
dict_decoder_init_ = false;
}
- virtual Status InitDataPage(uint8_t* data, int size) {
- // Data can be empty if the column contains all NULLs
- DCHECK_GE(size, 0);
- page_encoding_ = current_page_header_.data_page_header.encoding;
- if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
- page_encoding_ != parquet::Encoding::PLAIN) {
- return GetUnsupportedDecodingError();
- }
-
- // If slot_desc_ is NULL, dict_decoder_ is uninitialized
- if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) {
- if (!dict_decoder_init_) {
- return Status("File corrupt. Missing dictionary page.");
- }
- RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
- }
-
- // TODO: Perform filter selectivity checks here.
- return Status::OK();
- }
+ virtual Status InitDataPage(uint8_t* data, int size) override;
private:
- /// Writes the next value into the appropriate destination slot in 'tuple' using pool
- /// if necessary.
- ///
- /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
- /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
- /// true.
+ /// Writes the next value into the appropriate destination slot in 'tuple'. Returns
+ /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the
+ /// query is cancelled, or the scan node limit was reached. Otherwise returns true.
///
/// Force inlining - GCC does not always inline this into hot loops.
- template <bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
- ALWAYS_INLINE bool ReadSlot(Tuple* tuple, MemPool* pool) {
- void* slot = tuple->GetSlot(tuple_offset_);
- // Use an uninitialized stack allocation for temporary value to avoid running
- // constructors doing work unnecessarily, e.g. if T == StringValue.
- alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
- InternalType* val_ptr = reinterpret_cast<InternalType*>(
- NEEDS_CONVERSION ? val_buf : slot);
- if (IS_DICT_ENCODED) {
- DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
- if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
- SetDictDecodeError();
- return false;
- }
- } else {
- DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
- int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
- data_,data_end_,fixed_len_size_, val_ptr);
- if (UNLIKELY(encoded_len < 0)) {
- SetPlainDecodeError();
- return false;
- }
- data_ += encoded_len;
- }
+ template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+ inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
- if (UNLIKELY(NeedsValidationInline() && !ValidateSlot(val_ptr, tuple))) {
- return false;
- }
- if (NEEDS_CONVERSION && !tuple->IsNull(null_indicator_offset_)
- && UNLIKELY(!ConvertSlot(val_ptr, slot, pool))) {
- return false;
- }
- return true;
- }
+ /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
+ /// This helper is only used on the batched decoding path where we need to reset
+ /// 'pos_current_value_' to 0 based on 'rep_level'.
+ inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos);
+
+ /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte
+ /// past the end of the buffer. Return false and set 'parse_error_' if there is an
+ /// error decoding the value.
+ template <Encoding::type ENCODING>
+ inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end,
+ InternalType* RESTRICT val) RESTRICT;
/// Most column readers never require conversion, so we can avoid branches by
/// returning constant false. Column readers for types that require conversion
@@ -551,14 +346,15 @@ class ScalarColumnReader : public BaseScalarColumnReader {
}
/// Converts and writes 'src' into 'slot' based on desc_->type()
- bool ConvertSlot(const InternalType* src, void* slot, MemPool* pool) {
+ bool ConvertSlot(const InternalType* src, void* slot) {
DCHECK(false);
return false;
}
- /// Sets error message and returns false if the slot value is invalid, e.g., due to
- /// being out of the valid value range.
- bool ValidateSlot(InternalType* src, Tuple* tuple) const {
+ /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
+ /// is invalid, logs the error and returns false. If the error should stop execution,
+ /// sets 'parent_->parse_status_'.
+ bool ValidateValue(InternalType* val) const {
DCHECK(false);
return false;
}
@@ -578,26 +374,301 @@ class ScalarColumnReader : public BaseScalarColumnReader {
DictDecoder<InternalType> dict_decoder_;
/// True if dict_decoder_ has been initialized with a dictionary page.
- bool dict_decoder_init_;
+ bool dict_decoder_init_ = false;
/// true if decoded values must be converted before being written to an output tuple.
- bool needs_conversion_;
+ bool needs_conversion_ = false;
/// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
/// the max length for VARCHAR columns. Unused otherwise.
int fixed_len_size_;
};
-template<>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
+ HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
+ : BaseScalarColumnReader(parent, node, slot_desc),
+ dict_decoder_(parent->scan_node_->mem_tracker()) {
+ if (!MATERIALIZED) {
+ // We're not materializing any values, just counting them. No need (or ability) to
+ // initialize state used to materialize values.
+ DCHECK(slot_desc_ == nullptr);
+ return;
+ }
+
+ DCHECK(slot_desc_ != nullptr);
+ DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
+ if (slot_desc_->type().type == TYPE_DECIMAL
+ && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ fixed_len_size_ = node.element->type_length;
+ } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+ fixed_len_size_ = slot_desc_->type().len;
+ } else {
+ fixed_len_size_ = -1;
+ }
+ needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
+ // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
+ // values. Currently all versions have converted values.
+ (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+ slot_desc_->type().type == TYPE_TIMESTAMP &&
+ parent->file_version_.application == "parquet-mr");
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
+ uint8_t* data, int size) {
+ // Data can be empty if the column contains all NULLs
+ DCHECK_GE(size, 0);
+ page_encoding_ = current_page_header_.data_page_header.encoding;
+ if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
+ page_encoding_ != parquet::Encoding::PLAIN) {
+ return GetUnsupportedDecodingError();
+ }
+
+ // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does
+ // not need to be initialized.
+ if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
+ if (!dict_decoder_init_) {
+ return Status("File corrupt. Missing dictionary page.");
+ }
+ RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
+ }
+ // TODO: Perform filter selectivity checks here.
+ return Status::OK();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
+ Tuple* tuple) {
+ // NextLevels() should have already been called and def and rep levels should be in
+ // valid range.
+ DCHECK_GE(rep_level_, 0);
+ DCHECK_GE(def_level_, 0);
+ DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+ "Caller should have called NextLevels() until we are ready to read a value";
+
+ if (MATERIALIZED) {
+ if (def_level_ >= max_def_level()) {
+ bool continue_execution;
+ if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+ continue_execution = NeedsConversionInline() ?
+ ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
+ ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
+ } else {
+ DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+ continue_execution = NeedsConversionInline() ?
+ ReadSlot<Encoding::PLAIN, true>(tuple) :
+ ReadSlot<Encoding::PLAIN, false>(tuple);
+ }
+ if (!continue_execution) return false;
+ } else {
+ tuple->SetNull(null_indicator_offset_);
+ }
+ }
+ return NextLevels<IN_COLLECTION>();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch(
+ int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT {
+ // Repetition level is only present if this column is nested in a collection type.
+ if (IN_COLLECTION) {
+ DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
+ } else {
+ DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+ }
+
+ int val_count = 0;
+ bool continue_execution = true;
+ while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+ DCHECK_GE(num_buffered_values_, 0);
+ // Read next page if necessary.
+ if (num_buffered_values_ == 0) {
+ if (!NextPage()) {
+ continue_execution = parent_->parse_status_.ok();
+ continue;
+ }
+ }
+
+ // Not materializing anything - skip decoding any levels and rely on the value
+ // count from page metadata to return the correct number of rows.
+ if (!MATERIALIZED && !IN_COLLECTION) {
+ int vals_to_add = min(num_buffered_values_, max_values - val_count);
+ val_count += vals_to_add;
+ num_buffered_values_ -= vals_to_add;
+ continue;
+ }
+ // Fill the rep level cache if needed. We are flattening out the fields of the
+ // nested collection into the top-level tuple returned by the scan, so we don't
+ // care about the nesting structure unless the position slot is being populated.
+ if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+ parent_->parse_status_.MergeStatus(
+ rep_levels_.CacheNextBatch(num_buffered_values_));
+ if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+ }
+
+ // Fill def level cache if needed.
+ if (!def_levels_.CacheHasNext()) {
+ // TODO: add a fast path here if there's a run of repeated values.
+ parent_->parse_status_.MergeStatus(
+ def_levels_.CacheNextBatch(num_buffered_values_));
+ if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+ }
+
+ // Read data page and cached levels to materialize values.
+ uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+ int remaining_val_capacity = max_values - val_count;
+ int ret_val_count = 0;
+ continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+ remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+ val_count += ret_val_count;
+ if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+ continue_execution &= ColReaderDebugAction(&val_count);
+ }
+ }
+ *num_values = val_count;
+ return continue_execution;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+ int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT {
+ DCHECK(MATERIALIZED || IN_COLLECTION);
+ DCHECK_GT(num_buffered_values_, 0);
+ DCHECK(def_levels_.CacheHasNext());
+ if (IN_COLLECTION && pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+ const int cache_start_idx = def_levels_.CacheCurrIdx();
+ uint8_t* curr_tuple = tuple_mem;
+ int val_count = 0;
+ while (def_levels_.CacheHasNext() && val_count < max_values) {
+ Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+ int def_level = def_levels_.CacheGetNext();
+
+ if (IN_COLLECTION) {
+ if (def_level < def_level_of_immediate_repeated_ancestor()) {
+ // A containing repeated field is empty or NULL. Skip the value but
+ // move to the next repetition level if necessary.
+ if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
+ continue;
+ }
+ if (pos_slot_desc_ != nullptr) {
+ ReadPositionBatched(rep_levels_.CacheGetNext(),
+ tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
+ }
+ }
+
+ if (MATERIALIZED) {
+ if (def_level >= max_def_level()) {
+ bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
+ if (UNLIKELY(!continue_execution)) return false;
+ } else {
+ tuple->SetNull(null_indicator_offset_);
+ }
+ }
+ curr_tuple += tuple_size;
+ ++val_count;
+ }
+ num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+ *num_values = val_count;
+ return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+ int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+ int* RESTRICT num_values) RESTRICT {
+ // Dispatch to the correct templated implementation of MaterializeValueBatch().
+ if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+ if (NeedsConversionInline()) {
+ return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>(
+ max_values, tuple_size, tuple_mem, num_values);
+ } else {
+ return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>(
+ max_values, tuple_size, tuple_mem, num_values);
+ }
+ } else {
+ DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+ if (NeedsConversionInline()) {
+ return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
+ max_values, tuple_size, tuple_mem, num_values);
+ } else {
+ return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
+ max_values, tuple_size, tuple_mem, num_values);
+ }
+ }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
+ Tuple* RESTRICT tuple) RESTRICT {
+ void* slot = tuple->GetSlot(tuple_offset_);
+ // Use an uninitialized stack allocation for temporary value to avoid running
+ // constructors doing work unnecessarily, e.g. if T == StringValue.
+ alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+ InternalType* val_ptr =
+ reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
+
+ if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
+ if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+ if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+ // The value is invalid but execution should continue - set the null indicator and
+ // skip conversion.
+ tuple->SetNull(null_indicator_offset_);
+ return true;
+ }
+ if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
+ return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
+ uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+ InternalType* RESTRICT val) RESTRICT {
+ DCHECK_EQ(page_encoding_, ENCODING);
+ if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+ if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
+ SetDictDecodeError();
+ return false;
+ }
+ } else {
+ DCHECK_EQ(ENCODING, Encoding::PLAIN);
+ int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+ *data, data_end, fixed_len_size_, val);
+ if (UNLIKELY(encoded_len < 0)) {
+ SetPlainDecodeError();
+ return false;
+ }
+ *data += encoded_len;
+ }
+ return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
+ ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
+ // Reset position counter if we are at the start of a new parent collection.
+ if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+ *pos = pos_current_value_++;
+}
+
+template <>
inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
::NeedsConversionInline() const {
return needs_conversion_;
}
-template<>
+template <>
bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
- const StringValue* src, void* slot, MemPool* pool) {
- DCHECK(slot_desc() != NULL);
+ const StringValue* src, void* slot) {
+ DCHECK(slot_desc() != nullptr);
DCHECK(slot_desc()->type().type == TYPE_CHAR);
int char_len = slot_desc()->type().len;
int unpadded_len = min(char_len, src->len);
@@ -607,15 +678,15 @@ bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSl
return true;
}
-template<>
+template <>
inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
::NeedsConversionInline() const {
return needs_conversion_;
}
-template<>
+template <>
bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
- const TimestampValue* src, void* slot, MemPool* pool) {
+ const TimestampValue* src, void* slot) {
// Conversion should only happen when this flag is enabled.
DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
@@ -624,24 +695,21 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot
return true;
}
-template<>
+template <>
inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
::NeedsValidationInline() const {
return true;
}
-template<>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateSlot(
- TimestampValue* src, Tuple* tuple) const {
- if (UNLIKELY(!TimestampValue::IsValidDate(src->date()))) {
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
+ TimestampValue* val) const {
+ if (UNLIKELY(!TimestampValue::IsValidDate(val->date()))) {
ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
filename(), node_.element->name);
Status status = parent_->state_->LogOrReturnError(msg);
- if (!status.ok()) {
- parent_->parse_status_ = status;
- return false;
- }
- tuple->SetNull(null_indicator_offset_);
+ if (!status.ok()) parent_->parse_status_ = status;
+ return false;
}
return true;
}
@@ -651,107 +719,45 @@ class BoolColumnReader : public BaseScalarColumnReader {
BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: BaseScalarColumnReader(parent, node, slot_desc) {
- if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
+ if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
}
virtual ~BoolColumnReader() { }
- virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
- return ReadValue<true>(pool, tuple);
+ virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+ return ReadValue<true>(tuple);
}
- virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
- return ReadValue<false>(pool, tuple);
+ virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+ return ReadValue<false>(tuple);
}
protected:
virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
- DictDecoderBase** decoder) {
+ DictDecoderBase** decoder) override {
DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
<< "have gotten this far.";
return Status::OK();
}
- virtual bool HasDictionaryDecoder() {
+ virtual bool HasDictionaryDecoder() override {
// Decoder should never be created for bools.
return false;
}
- virtual void ClearDictionaryDecoder() { }
+ virtual void ClearDictionaryDecoder() override { }
- virtual Status InitDataPage(uint8_t* data, int size) {
- page_encoding_ = current_page_header_.data_page_header.encoding;
- // Only the relevant decoder is initialized for a given data page.
- switch (page_encoding_) {
- case parquet::Encoding::PLAIN:
- bool_values_.Reset(data, size);
- break;
- case parquet::Encoding::RLE:
- // The first 4 bytes contain the size of the encoded data. This information is
- // redundant, as this is the last part of the data page, and the number of
- // remaining bytes is already known.
- rle_decoder_.Reset(data + 4, size - 4, 1);
- break;
- default:
- return GetUnsupportedDecodingError();
- }
- num_unpacked_values_ = 0;
- unpacked_value_idx_ = 0;
- return Status::OK();
- }
+ virtual Status InitDataPage(uint8_t* data, int size) override;
private:
- template<bool IN_COLLECTION>
- inline bool ReadValue(MemPool* pool, Tuple* tuple) {
- DCHECK(slot_desc_ != NULL);
- // Def and rep levels should be in valid range.
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a value";
-
- if (def_level_ >= max_def_level()) {
- return ReadSlot<IN_COLLECTION>(tuple, pool);
- } else {
- // Null value
- tuple->SetNull(null_indicator_offset_);
- return NextLevels<IN_COLLECTION>();
- }
- }
-
- /// Writes the next value into the next slot in the *tuple using pool if necessary.
- /// Also advances def_level_ and rep_level_ via NextLevels().
- ///
- /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
- /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
- /// true.
- template<bool IN_COLLECTION>
- inline bool ReadSlot(Tuple* tuple, MemPool* pool) {
- void* slot = tuple->GetSlot(tuple_offset_);
- bool val;
- if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
- val = unpacked_values_[unpacked_value_idx_++];
- } else {
- // Unpack as many values as we can into the buffer. We expect to read at least one
- // value.
- if (page_encoding_ == parquet::Encoding::PLAIN) {
- num_unpacked_values_ =
- bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
- } else {
- num_unpacked_values_ =
- rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
- }
+ template <bool IN_COLLECTION>
+ inline bool ReadValue(Tuple* tuple);
- if (UNLIKELY(num_unpacked_values_ == 0)) {
- parent_->parse_status_ = Status("Invalid bool column.");
- return false;
- }
- val = unpacked_values_[0];
- unpacked_value_idx_ = 1;
- }
- *reinterpret_cast<bool*>(slot) = val;
- return NextLevels<IN_COLLECTION>();
- }
+ /// Decodes the next value into 'value'. Returns false and sets
+ /// 'parent_->parse_status_' if an error is encountered decoding the
+ /// value. Otherwise returns true.
+ template <bool IN_COLLECTION>
+ inline bool DecodeValue(bool* value);
/// A buffer to store unpacked values. Must be a multiple of 32 size to use the
/// batch-oriented interface of BatchedBitReader.
@@ -771,6 +777,71 @@ class BoolColumnReader : public BaseScalarColumnReader {
RleBatchDecoder<bool> rle_decoder_;
};
+Status BoolColumnReader::InitDataPage(uint8_t* data, int size) {
+ page_encoding_ = current_page_header_.data_page_header.encoding;
+ // Only the relevant decoder is initialized for a given data page.
+ switch (page_encoding_) {
+ case parquet::Encoding::PLAIN:
+ bool_values_.Reset(data, size);
+ break;
+ case parquet::Encoding::RLE:
+ // The first 4 bytes contain the size of the encoded data. This information is
+ // redundant, as this is the last part of the data page, and the number of
+ // remaining bytes is already known.
+ rle_decoder_.Reset(data + 4, size - 4, 1);
+ break;
+ default:
+ return GetUnsupportedDecodingError();
+ }
+ num_unpacked_values_ = 0;
+ unpacked_value_idx_ = 0;
+ return Status::OK();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::ReadValue(Tuple* tuple) {
+ DCHECK(slot_desc_ != nullptr);
+ // Def and rep levels should be in valid range.
+ DCHECK_GE(rep_level_, 0);
+ DCHECK_GE(def_level_, 0);
+ DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+ "Caller should have called NextLevels() until we are ready to read a value";
+
+ if (def_level_ >= max_def_level()) {
+ bool* slot = tuple->GetBoolSlot(tuple_offset_);
+ if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false;
+ } else {
+ // Null value
+ tuple->SetNull(null_indicator_offset_);
+ }
+ return NextLevels<IN_COLLECTION>();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::DecodeValue(bool* value) {
+ if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
+ *value = unpacked_values_[unpacked_value_idx_++];
+ } else {
+ // Unpack as many values as we can into the buffer. We expect to read at least one
+ // value.
+ if (page_encoding_ == parquet::Encoding::PLAIN) {
+ num_unpacked_values_ =
+ bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+ } else {
+ num_unpacked_values_ =
+ rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+ }
+
+ if (UNLIKELY(num_unpacked_values_ == 0)) {
+ parent_->parse_status_ = Status("Invalid bool column.");
+ return false;
+ }
+ *value = unpacked_values_[0];
+ unpacked_value_idx_ = 1;
+ }
+ return true;
+}
+
// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
// path doesn't falsely report that the file is corrupted.
bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
@@ -802,7 +873,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
continue;
}
// Fill in position slot if applicable
- if (pos_slot_desc_ != NULL) ReadPosition(tuple);
+ if (pos_slot_desc_ != nullptr) {
+ ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+ }
continue_execution = ReadValue(pool, tuple);
++val_count;
if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
@@ -834,17 +907,14 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
return continue_execution;
}
-void ParquetColumnReader::ReadPosition(Tuple* tuple) {
- DCHECK(pos_slot_desc() != NULL);
+void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
// NextLevels() should have already been called
DCHECK_GE(rep_level_, 0);
DCHECK_GE(def_level_, 0);
DCHECK_GE(pos_current_value_, 0);
DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
"Caller should have called NextLevels() until we are ready to read a value";
-
- void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
- *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
+ *pos = pos_current_value_++;
}
// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
@@ -1068,8 +1138,8 @@ Status BaseScalarColumnReader::InitDictionary() {
}
}
if (dict_header != nullptr &&
- dict_header->encoding != parquet::Encoding::PLAIN &&
- dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+ dict_header->encoding != Encoding::PLAIN &&
+ dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
"for dictionary pages.");
}
@@ -1223,7 +1293,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
num_values_read_ += num_buffered_values_;
int uncompressed_size = current_page_header_.uncompressed_page_size;
- if (decompressor_.get() != NULL) {
+ if (decompressor_.get() != nullptr) {
SCOPED_TIMER(parent_->decompress_timer_);
uint8_t* decompressed_buffer;
RETURN_IF_ERROR(AllocateUncompressedDataPage(
@@ -1294,7 +1364,7 @@ Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
return Status::OK();
}
-template<bool ADVANCE_REP_LEVEL>
+template <bool ADVANCE_REP_LEVEL>
bool BaseScalarColumnReader::NextLevels() {
if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
@@ -1382,7 +1452,7 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
if (tuple_offset_ == -1) {
return CollectionColumnReader::NextLevels();
} else if (def_level_ >= max_def_level()) {
- return ReadSlot(tuple, pool);
+ return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
} else {
// Null value
tuple->SetNull(null_indicator_offset_);
@@ -1390,21 +1460,18 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
}
}
-bool CollectionColumnReader::ReadNonRepeatedValue(
- MemPool* pool, Tuple* tuple) {
+bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
return CollectionColumnReader::ReadValue(pool, tuple);
}
-bool CollectionColumnReader::ReadSlot(Tuple* tuple, MemPool* pool) {
+bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
DCHECK(!children_.empty());
DCHECK_LE(rep_level_, new_collection_rep_level());
// Recursively read the collection into a new CollectionValue.
- CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(
- tuple->GetSlot(tuple_offset_));
- *coll_slot = CollectionValue();
+ *slot = CollectionValue();
CollectionValueBuilder builder(
- coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+ slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
bool continue_execution = parent_->AssembleCollection(
children_, new_collection_rep_level(), &builder);
if (!continue_execution) return false;
@@ -1483,11 +1550,11 @@ static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
bool is_collection_field, const SlotDescriptor* slot_desc,
HdfsParquetScanner* parent) {
- ParquetColumnReader* reader = NULL;
+ ParquetColumnReader* reader = nullptr;
if (is_collection_field) {
// Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
reader = new CollectionColumnReader(parent, node, slot_desc);
- } else if (slot_desc != NULL) {
+ } else if (slot_desc != nullptr) {
// Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
switch (slot_desc->type().type) {
case TYPE_BOOLEAN:
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 28138bd..e720375 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -232,10 +232,12 @@ class ParquetColumnReader {
/// true.
virtual bool NextLevels() = 0;
- /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to
- /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into
- /// 'tuple') and increments pos_current_value_.
- void ReadPosition(Tuple* tuple);
+ /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the
+ /// parent collection) to 'pos' and increments pos_current_value_. Only valid to
+ /// call when doing non-batched reading, i.e. NextLevels() must have been called
+ /// before each call to this function to advance to the next element in the
+ /// collection.
+ void ReadPositionNonBatched(int64_t* pos);
/// Returns true if this column reader has reached the end of the row group.
inline bool RowGroupAtEnd() {
@@ -251,7 +253,7 @@ class ParquetColumnReader {
protected:
HdfsParquetScanner* parent_;
const SchemaNode& node_;
- const SlotDescriptor* slot_desc_;
+ const SlotDescriptor* const slot_desc_;
/// The slot descriptor for the position field of the tuple, if there is one. NULL if
/// there's not. Only one column reader for a given tuple desc will have this set.
@@ -272,17 +274,17 @@ class ParquetColumnReader {
/// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
/// are accessed in inner loops.
int16_t rep_level_;
- int16_t max_rep_level_;
+ const int16_t max_rep_level_;
int16_t def_level_;
- int16_t max_def_level_;
+ const int16_t max_def_level_;
// Cache frequently accessed members of slot_desc_ for perf.
/// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
- int tuple_offset_;
+ const int tuple_offset_;
/// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
- NullIndicatorOffset null_indicator_offset_;
+ const NullIndicatorOffset null_indicator_offset_;
ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
@@ -579,13 +581,12 @@ class CollectionColumnReader : public ParquetColumnReader {
void UpdateDerivedState();
/// Recursively reads from children_ to assemble a single CollectionValue into
- /// the appropriate destination slot in 'tuple'. Also advances rep_level_ and
- /// def_level_ via NextLevels().
+ /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels().
///
/// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
/// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
/// true.
- inline bool ReadSlot(Tuple* tuple, MemPool* pool);
+ inline bool ReadSlot(CollectionValue* slot, MemPool* pool);
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index ae9be32..68f313d 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -243,6 +243,14 @@ class Tuple {
return reinterpret_cast<const char*>(this) + offset;
}
+ bool* GetBoolSlot(int offset) {
+ return static_cast<bool*>(GetSlot(offset));
+ }
+
+ int64_t* GetBigIntSlot(int offset) {
+ return static_cast<int64_t*>(GetSlot(offset));
+ }
+
StringValue* GetStringSlot(int offset) {
DCHECK(offset != -1); // -1 offset indicates non-materialized slot
return reinterpret_cast<StringValue*>(reinterpret_cast<char*>(this) + offset);
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index bac127a..e527dc8 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -114,6 +114,8 @@ class BatchedBitReader {
/// Resets the read to start reading from the start of 'buffer'. The buffer's
/// length is 'buffer_len'. Does not take ownership of the buffer.
void Reset(const uint8_t* buffer, int64_t buffer_len) {
+ DCHECK(buffer != nullptr);
+ DCHECK_GE(buffer_len, 0);
buffer_pos_ = buffer;
buffer_end_ = buffer + buffer_len;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/13604bd4/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 83b6315..1491066 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -471,6 +471,8 @@ inline void RleEncoder::Clear() {
template <typename T>
inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
+ DCHECK(buffer != nullptr);
+ DCHECK_GE(buffer_len, 0);
DCHECK_GE(bit_width, 0);
DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
bit_reader_.Reset(buffer, buffer_len);