You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/03 20:18:06 UTC

[2/2] impala git commit: IMPALA-4123 (prep): Parquet column reader cleanup

IMPALA-4123 (prep): Parquet column reader cleanup

Some miscellaneous cleanup to make it easier to understand and
make future changes to the Parquet scanner.

A lot of the refactoring is about more cleanly separating functions
so that they have clearer purpose, e.g.:
* Functions that strictly do decoding, i.e. materialize values, convert
  and validate them. These are changed to operate on single values, not tuples.
* Functions that are used for the non-batched decoding path (i.e. driven
  by CollectionColumnReader or BoolColumnReader).
* Functions that dispatch to a templated implementation based on one or
  more runtime values.

Other misc changes:
* Move large functions out of class bodies.
* Use parquet::Encoding instead of bool to indicate encoding.
* Add some additional DCHECKs.

Testing:
* Ran exhaustive tests
* Ran fuzz test in a loop

Change-Id: Ibc00352df3a0b2d605f872ae7e43db2dc90faab1
Reviewed-on: http://gerrit.cloudera.org:8080/9799
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/51bc004d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/51bc004d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/51bc004d

Branch: refs/heads/master
Commit: 51bc004d798d2c7ab7b8b4553d32d26cb7382ad6
Parents: aee045d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Oct 11 23:58:19 2017 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu May 3 19:18:09 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   |  10 +-
 be/src/exec/parquet-column-readers.cc | 833 ++++++++++++++++-------------
 be/src/exec/parquet-column-readers.h  |  25 +-
 be/src/runtime/tuple.h                |   8 +
 be/src/util/bit-stream-utils.h        |   2 +
 be/src/util/rle-encoding.h            |   2 +
 6 files changed, 482 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b40816d..8cec3d2 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -362,8 +362,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
       Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
       TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
       InitTuple(template_tuple_, dst_tuple);
-      int64_t* dst_slot = reinterpret_cast<int64_t*>(dst_tuple->GetSlot(
-          scan_node_->parquet_count_star_slot_offset()));
+      int64_t* dst_slot =
+          dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
       *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
       row_group_rows_read_ += *dst_slot;
       dst_row->SetTuple(0, dst_tuple);
@@ -1142,7 +1142,11 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
       FILE_CHECK_GE(col_reader->def_level(),
                     col_reader->def_level_of_immediate_repeated_ancestor());
       // Fill in position slot if applicable
-      if (col_reader->pos_slot_desc() != nullptr) col_reader->ReadPosition(tuple);
+      const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
+      if (pos_slot_desc != nullptr) {
+        col_reader->ReadPositionNonBatched(
+            tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
+      }
       continue_execution = col_reader->ReadValue(pool, tuple);
     } else {
       // A containing repeated field is empty or NULL

http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index f5b73c9..add9702 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -73,14 +73,18 @@ static int debug_count = 0;
 
 using namespace impala::io;
 
+using parquet::Encoding;
+
 namespace impala {
 
 const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
     "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
 
 Status ParquetLevelDecoder::Init(const string& filename,
-    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+    Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  DCHECK(*data != nullptr);
+  DCHECK_GE(*data_size, 0);
   DCHECK_GE(num_buffered_values, 0);
   DCHECK_GT(cache_size, 0);
   cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
@@ -95,7 +99,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
 
   int32_t num_bytes = 0;
   switch (encoding) {
-    case parquet::Encoding::RLE: {
+    case Encoding::RLE: {
       Status status;
       if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
         return status;
@@ -128,15 +132,15 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
   num_cached_levels_ = 0;
   cached_level_idx_ = 0;
   // Memory has already been allocated.
-  if (cached_levels_ != NULL) {
+  if (cached_levels_ != nullptr) {
     DCHECK_EQ(cache_size_, cache_size);
     return Status::OK();
   }
 
   cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
-  if (cached_levels_ == NULL) {
+  if (cached_levels_ == nullptr) {
     return pool->mem_tracker()->MemLimitExceeded(
-        NULL, "Definition level cache", cache_size);
+        nullptr, "Definition level cache", cache_size);
   }
   memset(cached_levels_, 0, cache_size);
   cache_size_ = cache_size;
@@ -198,98 +202,41 @@ bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
 /// If MATERIALIZED is true, the column values are materialized into the slot described
 /// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
 /// the position can be accessed.
-template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader : public BaseScalarColumnReader {
  public:
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc),
-      dict_decoder_(parent->scan_node_->mem_tracker()),
-      dict_decoder_init_(false),
-      needs_conversion_(false) {
-    if (!MATERIALIZED) {
-      // We're not materializing any values, just counting them. No need (or ability) to
-      // initialize state used to materialize values.
-      DCHECK(slot_desc_ == NULL);
-      return;
-    }
-
-    DCHECK(slot_desc_ != NULL);
-    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL
-        && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
-      fixed_len_size_ = node.element->type_length;
-    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
-      fixed_len_size_ = slot_desc_->type().len;
-    } else {
-      fixed_len_size_ = -1;
-    }
-    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-        // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
-        // values. Currently all versions have converted values.
-        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
-        slot_desc_->type().type == TYPE_TIMESTAMP &&
-        parent->file_version_.application == "parquet-mr");
-  }
-
+      const SlotDescriptor* slot_desc);
   virtual ~ScalarColumnReader() { }
 
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<true>(tuple);
   }
 
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<false>(tuple);
   }
 
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
   }
 
   virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values);
   }
 
-  virtual DictDecoderBase* GetDictionaryDecoder() {
+  virtual DictDecoderBase* GetDictionaryDecoder() override {
     return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
   }
 
-  virtual bool NeedsConversion() { return NeedsConversionInline(); }
-  virtual bool NeedsValidation() { return NeedsValidationInline(); }
+  virtual bool NeedsConversion() override { return NeedsConversionInline(); }
+  virtual bool NeedsValidation() override { return NeedsValidationInline(); }
 
  protected:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    // NextLevels() should have already been called and def and rep levels should be in
-    // valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_GE(def_level_, 0);
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (MATERIALIZED) {
-      if (def_level_ >= max_def_level()) {
-        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-          if (NeedsConversionInline()) {
-            if (!ReadSlot<true, true>(tuple, pool)) return false;
-          } else {
-            if (!ReadSlot<true, false>(tuple, pool)) return false;
-          }
-        } else {
-          if (NeedsConversionInline()) {
-            if (!ReadSlot<false, true>(tuple, pool)) return false;
-          } else {
-            if (!ReadSlot<false, false>(tuple, pool)) return false;
-          }
-        }
-      } else {
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(Tuple* tuple);
 
   /// Implementation of the ReadValueBatch() functions specialized for this
   /// column reader type. This function drives the reading of data pages and
@@ -302,74 +249,14 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
   /// -fno-strict-alias).
   template <bool IN_COLLECTION>
-  bool ReadValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    // Repetition level is only present if this column is nested in a collection type.
-    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
-    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
-
-    int val_count = 0;
-    bool continue_execution = true;
-    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-      DCHECK_GE(num_buffered_values_, 0);
-      // Read next page if necessary.
-      if (num_buffered_values_ == 0) {
-        if (!NextPage()) {
-          continue_execution = parent_->parse_status_.ok();
-          continue;
-        }
-      }
-
-      // Not materializing anything - skip decoding any levels and rely on the value
-      // count from page metadata to return the correct number of rows.
-      if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(num_buffered_values_, max_values - val_count);
-        val_count += vals_to_add;
-        num_buffered_values_ -= vals_to_add;
-        continue;
-      }
-      // Fill the rep level cache if needed. We are flattening out the fields of the
-      // nested collection into the top-level tuple returned by the scan, so we don't
-      // care about the nesting structure unless the position slot is being populated.
-      if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(
-            rep_levels_.CacheNextBatch(num_buffered_values_));
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      }
-
-      // Fill def level cache if needed.
-      if (!def_levels_.CacheHasNext()) {
-        // TODO: add a fast path here if there's a run of repeated values.
-        parent_->parse_status_.MergeStatus(
-            def_levels_.CacheNextBatch(num_buffered_values_));
-        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      }
-
-      // Read data page and cached levels to materialize values.
-      int cache_start_idx = def_levels_.CacheCurrIdx();
-      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-      int remaining_val_capacity = max_values - val_count;
-      int ret_val_count = 0;
-      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      } else {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      }
-      val_count += ret_val_count;
-      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
-      if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-        continue_execution &= ColReaderDebugAction(&val_count);
-      }
-    }
-    *num_values = val_count;
-    return continue_execution;
-  }
+  bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
   /// Helper function for ReadValueBatch() above that performs value materialization.
   /// It assumes a data page with remaining values is available, and that the def/rep
-  /// level caches have been populated.
+  /// level caches have been populated. Materializes values into 'tuple_mem' with a
+  /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of
+  /// values materialized in 'num_values'.
   /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
   /// handled in this function.
   /// Use RESTRICT so that the compiler knows that it is safe to cache member
@@ -377,68 +264,18 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
   /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
   /// -fno-strict-alias).
-  template <bool IN_COLLECTION, bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
-  bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    DCHECK(MATERIALIZED || IN_COLLECTION);
-    DCHECK_GT(num_buffered_values_, 0);
-    DCHECK(def_levels_.CacheHasNext());
-    if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext());
-
-    uint8_t* curr_tuple = tuple_mem;
-    int val_count = 0;
-    while (def_levels_.CacheHasNext() && val_count < max_values) {
-      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-      int def_level = def_levels_.CacheGetNext();
-
-      if (IN_COLLECTION) {
-        if (def_level < def_level_of_immediate_repeated_ancestor()) {
-          // A containing repeated field is empty or NULL. Skip the value but
-          // move to the next repetition level if necessary.
-          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
-          continue;
-        }
-        if (pos_slot_desc_ != NULL) {
-          int rep_level = rep_levels_.CacheGetNext();
-          // Reset position counter if we are at the start of a new parent collection.
-          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
-        }
-      }
+  template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
-      if (MATERIALIZED) {
-        if (def_level >= max_def_level()) {
-          bool continue_execution =
-              ReadSlot<IS_DICT_ENCODED, NEEDS_CONVERSION>(tuple, pool);
-          if (UNLIKELY(!continue_execution)) return false;
-        } else {
-          tuple->SetNull(null_indicator_offset_);
-        }
-      }
-      curr_tuple += tuple_size;
-      ++val_count;
-    }
-    *num_values = val_count;
-    return true;
-  }
-
-  // Dispatch to the correct templated implementation of MaterializeValueBatch based
-  // on NeedsConversionInline().
-  template <bool IN_COLLECTION, bool IS_DICT_ENCODED>
-  bool MaterializeValueBatch(MemPool* RESTRICT pool, int max_values, int tuple_size,
-      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
-    if (NeedsConversionInline()) {
-      return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, true>(
-          pool, max_values, tuple_size, tuple_mem, num_values);
-    } else {
-      return MaterializeValueBatch<IN_COLLECTION, IS_DICT_ENCODED, false>(
-          pool, max_values, tuple_size, tuple_mem, num_values);
-    }
-  }
+  /// Same as above, but dispatches to the appropriate templated implementation of
+  /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline().
+  template <bool IN_COLLECTION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
 
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
+      DictDecoderBase** decoder) override {
     if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
         return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
             slot_desc_->type().DebugString(), "could not decode dictionary");
@@ -448,78 +285,36 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     return Status::OK();
   }
 
-  virtual bool HasDictionaryDecoder() {
+  virtual bool HasDictionaryDecoder() override {
     return dict_decoder_init_;
   }
 
-  virtual void ClearDictionaryDecoder() {
+  virtual void ClearDictionaryDecoder() override {
     dict_decoder_init_ = false;
   }
 
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    // Data can be empty if the column contains all NULLs
-    DCHECK_GE(size, 0);
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
-        page_encoding_ != parquet::Encoding::PLAIN) {
-      return GetUnsupportedDecodingError();
-    }
-
-    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
-    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) {
-      if (!dict_decoder_init_) {
-        return Status("File corrupt. Missing dictionary page.");
-      }
-      RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
-    }
-
-    // TODO: Perform filter selectivity checks here.
-    return Status::OK();
-  }
+  virtual Status InitDataPage(uint8_t* data, int size) override;
 
  private:
-  /// Writes the next value into the appropriate destination slot in 'tuple' using pool
-  /// if necessary.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
+  /// Writes the next value into the appropriate destination slot in 'tuple'. Returns
+  /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the
+  /// query is cancelled, or the scan node limit was reached. Otherwise returns true.
   ///
   /// Force inlining - GCC does not always inline this into hot loops.
-  template <bool IS_DICT_ENCODED, bool NEEDS_CONVERSION>
-  ALWAYS_INLINE bool ReadSlot(Tuple* tuple, MemPool* pool) {
-    void* slot = tuple->GetSlot(tuple_offset_);
-    // Use an uninitialized stack allocation for temporary value to avoid running
-    // constructors doing work unnecessarily, e.g. if T == StringValue.
-    alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
-    InternalType* val_ptr = reinterpret_cast<InternalType*>(
-        NEEDS_CONVERSION ? val_buf : slot);
-    if (IS_DICT_ENCODED) {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
-      if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
-        SetDictDecodeError();
-        return false;
-      }
-    } else {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
-          data_,data_end_,fixed_len_size_, val_ptr);
-      if (UNLIKELY(encoded_len < 0)) {
-        SetPlainDecodeError();
-        return false;
-      }
-      data_ += encoded_len;
-    }
+  template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
 
-    if (UNLIKELY(NeedsValidationInline() && !ValidateSlot(val_ptr, tuple))) {
-      return false;
-    }
-    if (NEEDS_CONVERSION && !tuple->IsNull(null_indicator_offset_)
-            && UNLIKELY(!ConvertSlot(val_ptr, slot, pool))) {
-      return false;
-    }
-    return true;
-  }
+  /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
+  /// This helper is only used on the batched decoding path where we need to reset
+  /// 'pos_current_value_' to 0 based on 'rep_level'.
+  inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos);
+
+  /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte
+  /// past the end of the buffer. Return false and set 'parse_error_' if there is an
+  /// error decoding the value.
+  template <Encoding::type ENCODING>
+  inline ALWAYS_INLINE bool DecodeValue(uint8_t** data, const uint8_t* data_end,
+      InternalType* RESTRICT val) RESTRICT;
 
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
@@ -538,14 +333,15 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
-  bool ConvertSlot(const InternalType* src, void* slot, MemPool* pool) {
+  bool ConvertSlot(const InternalType* src, void* slot) {
     DCHECK(false);
     return false;
   }
 
-  /// Sets error message and returns false if the slot value is invalid, e.g., due to
-  /// being out of the valid value range.
-  bool ValidateSlot(InternalType* src, Tuple* tuple) const {
+  /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
+  /// is invalid, logs the error and returns false. If the error should stop execution,
+  /// sets 'parent_->parse_status_'.
+  bool ValidateValue(InternalType* val) const {
     DCHECK(false);
     return false;
   }
@@ -565,26 +361,301 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
-  bool dict_decoder_init_;
+  bool dict_decoder_init_ = false;
 
   /// true if decoded values must be converted before being written to an output tuple.
-  bool needs_conversion_;
+  bool needs_conversion_ = false;
 
   /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
 };
 
-template<>
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
+    HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
+  : BaseScalarColumnReader(parent, node, slot_desc),
+    dict_decoder_(parent->scan_node_->mem_tracker()) {
+  if (!MATERIALIZED) {
+    // We're not materializing any values, just counting them. No need (or ability) to
+    // initialize state used to materialize values.
+    DCHECK(slot_desc_ == nullptr);
+    return;
+  }
+
+  DCHECK(slot_desc_ != nullptr);
+  DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
+  if (slot_desc_->type().type == TYPE_DECIMAL
+      && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    fixed_len_size_ = node.element->type_length;
+  } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+    fixed_len_size_ = slot_desc_->type().len;
+  } else {
+    fixed_len_size_ = -1;
+  }
+  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
+      // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
+      // values. Currently all versions have converted values.
+      (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+      slot_desc_->type().type == TYPE_TIMESTAMP &&
+      parent->file_version_.application == "parquet-mr");
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
+    uint8_t* data, int size) {
+  // Data can be empty if the column contains all NULLs
+  DCHECK_GE(size, 0);
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+  if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
+      page_encoding_ != parquet::Encoding::PLAIN) {
+    return GetUnsupportedDecodingError();
+  }
+
+  // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does
+  // not need to be initialized.
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
+    if (!dict_decoder_init_) {
+      return Status("File corrupt. Missing dictionary page.");
+    }
+    RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
+  }
+  // TODO: Perform filter selectivity checks here.
+  return Status::OK();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
+    Tuple* tuple) {
+  // NextLevels() should have already been called and def and rep levels should be in
+  // valid range.
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (MATERIALIZED) {
+    if (def_level_ >= max_def_level()) {
+      bool continue_execution;
+      if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
+            ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
+      } else {
+        DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN, true>(tuple) :
+            ReadSlot<Encoding::PLAIN, false>(tuple);
+      }
+      if (!continue_execution) return false;
+    } else {
+      tuple->SetNull(null_indicator_offset_);
+    }
+  }
+  return NextLevels<IN_COLLECTION>();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Repetition level is only present if this column is nested in a collection type.
+  if (IN_COLLECTION) {
+    DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
+  } else {
+    DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+  }
+
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    DCHECK_GE(num_buffered_values_, 0);
+    // Read next page if necessary.
+    if (num_buffered_values_ == 0) {
+      if (!NextPage()) {
+        continue_execution = parent_->parse_status_.ok();
+        continue;
+      }
+    }
+
+    // Not materializing anything - skip decoding any levels and rely on the value
+    // count from page metadata to return the correct number of rows.
+    if (!MATERIALIZED && !IN_COLLECTION) {
+      int vals_to_add = min(num_buffered_values_, max_values - val_count);
+      val_count += vals_to_add;
+      num_buffered_values_ -= vals_to_add;
+      continue;
+    }
+    // Fill the rep level cache if needed. We are flattening out the fields of the
+    // nested collection into the top-level tuple returned by the scan, so we don't
+    // care about the nesting structure unless the position slot is being populated.
+    if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+      parent_->parse_status_.MergeStatus(
+          rep_levels_.CacheNextBatch(num_buffered_values_));
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    }
+
+    // Fill def level cache if needed.
+    if (!def_levels_.CacheHasNext()) {
+      // TODO: add a fast path here if there's a run of repeated values.
+      parent_->parse_status_.MergeStatus(
+          def_levels_.CacheNextBatch(num_buffered_values_));
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    }
+
+    // Read data page and cached levels to materialize values.
+    uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+    int remaining_val_capacity = max_values - val_count;
+    int ret_val_count = 0;
+    continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+        remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+    val_count += ret_val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= ColReaderDebugAction(&val_count);
+    }
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  DCHECK(MATERIALIZED || IN_COLLECTION);
+  DCHECK_GT(num_buffered_values_, 0);
+  DCHECK(def_levels_.CacheHasNext());
+  if (IN_COLLECTION && pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  const int cache_start_idx = def_levels_.CacheCurrIdx();
+  uint8_t* curr_tuple = tuple_mem;
+  int val_count = 0;
+  while (def_levels_.CacheHasNext() && val_count < max_values) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    int def_level = def_levels_.CacheGetNext();
+
+    if (IN_COLLECTION) {
+      if (def_level < def_level_of_immediate_repeated_ancestor()) {
+        // A containing repeated field is empty or NULL. Skip the value but
+        // move to the next repetition level if necessary.
+        if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
+        continue;
+      }
+      if (pos_slot_desc_ != nullptr) {
+        ReadPositionBatched(rep_levels_.CacheGetNext(),
+            tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
+      }
+    }
+
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
+        if (UNLIKELY(!continue_execution)) return false;
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+    curr_tuple += tuple_size;
+    ++val_count;
+  }
+  num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+  *num_values = val_count;
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Dispatch to the correct templated implementation of MaterializeValueBatch().
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
+    Tuple* RESTRICT tuple) RESTRICT {
+  void* slot = tuple->GetSlot(tuple_offset_);
+  // Use an uninitialized stack allocation for temporary value to avoid running
+  // constructors doing work unnecessarily, e.g. if T == StringValue.
+  alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+  InternalType* val_ptr =
+      reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
+
+  if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
+  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+    if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    // The value is invalid but execution should continue - set the null indicator and
+    // skip conversion.
+    tuple->SetNull(null_indicator_offset_);
+    return true;
+  }
+  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
+    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+    InternalType* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, ENCODING);
+  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(ENCODING, Encoding::PLAIN);
+    int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        *data, data_end, fixed_len_size_, val);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    *data += encoded_len;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
+    ::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
+  // Reset position counter if we are at the start of a new parent collection.
+  if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+  *pos = pos_current_value_++;
+}
+
+template <>
 inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
 ::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
-template<>
+template <>
 bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
-    const StringValue* src, void* slot, MemPool* pool) {
-  DCHECK(slot_desc() != NULL);
+    const StringValue* src, void* slot) {
+  DCHECK(slot_desc() != nullptr);
   DCHECK(slot_desc()->type().type == TYPE_CHAR);
   int char_len = slot_desc()->type().len;
   int unpadded_len = min(char_len, src->len);
@@ -594,15 +665,15 @@ bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSl
   return true;
 }
 
-template<>
+template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
-template<>
+template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
-    const TimestampValue* src, void* slot, MemPool* pool) {
+    const TimestampValue* src, void* slot) {
   // Conversion should only happen when this flag is enabled.
   DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
   TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
@@ -611,24 +682,21 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot
   return true;
 }
 
-template<>
+template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsValidationInline() const {
   return true;
 }
 
-template<>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateSlot(
-    TimestampValue* src, Tuple* tuple) const {
-  if (UNLIKELY(!TimestampValue::IsValidDate(src->date()))) {
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
+    TimestampValue* val) const {
+  if (UNLIKELY(!TimestampValue::IsValidDate(val->date()))) {
     ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
         filename(), node_.element->name);
     Status status = parent_->state_->LogOrReturnError(msg);
-    if (!status.ok()) {
-      parent_->parse_status_ = status;
-      return false;
-    }
-    tuple->SetNull(null_indicator_offset_);
+    if (!status.ok()) parent_->parse_status_ = status;
+    return false;
   }
   return true;
 }
@@ -638,107 +706,45 @@ class BoolColumnReader : public BaseScalarColumnReader {
   BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : BaseScalarColumnReader(parent, node, slot_desc) {
-    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
+    if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
   }
 
   virtual ~BoolColumnReader() { }
 
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<true>(tuple);
   }
 
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<false>(tuple);
   }
 
  protected:
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
+      DictDecoderBase** decoder) override {
     DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
                   << "have gotten this far.";
     return Status::OK();
   }
 
-  virtual bool HasDictionaryDecoder() {
+  virtual bool HasDictionaryDecoder() override {
     // Decoder should never be created for bools.
     return false;
   }
 
-  virtual void ClearDictionaryDecoder() { }
+  virtual void ClearDictionaryDecoder() override { }
 
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    // Only the relevant decoder is initialized for a given data page.
-    switch (page_encoding_) {
-      case parquet::Encoding::PLAIN:
-        bool_values_.Reset(data, size);
-        break;
-      case parquet::Encoding::RLE:
-        // The first 4 bytes contain the size of the encoded data. This information is
-        // redundant, as this is the last part of the data page, and the number of
-        // remaining bytes is already known.
-        rle_decoder_.Reset(data + 4, size - 4, 1);
-        break;
-      default:
-        return GetUnsupportedDecodingError();
-    }
-    num_unpacked_values_ = 0;
-    unpacked_value_idx_ = 0;
-    return Status::OK();
-  }
+  virtual Status InitDataPage(uint8_t* data, int size) override;
 
  private:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    DCHECK(slot_desc_ != NULL);
-    // Def and rep levels should be in valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_GE(def_level_, 0);
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (def_level_ >= max_def_level()) {
-      return ReadSlot<IN_COLLECTION>(tuple, pool);
-    } else {
-      // Null value
-      tuple->SetNull(null_indicator_offset_);
-      return NextLevels<IN_COLLECTION>();
-    }
-  }
-
-  /// Writes the next value into the next slot in the *tuple using pool if necessary.
-  /// Also advances def_level_ and rep_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  template<bool IN_COLLECTION>
-  inline bool ReadSlot(Tuple* tuple, MemPool* pool)  {
-    void* slot = tuple->GetSlot(tuple_offset_);
-    bool val;
-    if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
-      val = unpacked_values_[unpacked_value_idx_++];
-    } else {
-      // Unpack as many values as we can into the buffer. We expect to read at least one
-      // value.
-      if (page_encoding_ == parquet::Encoding::PLAIN) {
-        num_unpacked_values_ =
-            bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
-      } else {
-        num_unpacked_values_ =
-            rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
-      }
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(Tuple* tuple);
 
-      if (UNLIKELY(num_unpacked_values_ == 0)) {
-        parent_->parse_status_ = Status("Invalid bool column.");
-        return false;
-      }
-      val = unpacked_values_[0];
-      unpacked_value_idx_ = 1;
-    }
-    *reinterpret_cast<bool*>(slot) = val;
-    return NextLevels<IN_COLLECTION>();
-  }
+  /// Decodes the next value into 'value'. Returns false and sets
+  /// 'parent_->parse_status_' if an error is encountered decoding the
+  /// value. Otherwise returns true.
+  template <bool IN_COLLECTION>
+  inline bool DecodeValue(bool* value);
 
   /// A buffer to store unpacked values. Must be a multiple of 32 size to use the
   /// batch-oriented interface of BatchedBitReader.
@@ -758,6 +764,71 @@ class BoolColumnReader : public BaseScalarColumnReader {
   RleBatchDecoder<bool> rle_decoder_;
 };
 
+Status BoolColumnReader::InitDataPage(uint8_t* data, int size) {
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+  // Only the relevant decoder is initialized for a given data page.
+  switch (page_encoding_) {
+    case parquet::Encoding::PLAIN:
+      bool_values_.Reset(data, size);
+      break;
+    case parquet::Encoding::RLE:
+      // The first 4 bytes contain the size of the encoded data. This information is
+      // redundant, as this is the last part of the data page, and the number of
+      // remaining bytes is already known.
+      rle_decoder_.Reset(data + 4, size - 4, 1);
+      break;
+    default:
+      return GetUnsupportedDecodingError();
+  }
+  num_unpacked_values_ = 0;
+  unpacked_value_idx_ = 0;
+  return Status::OK();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::ReadValue(Tuple* tuple) {
+  DCHECK(slot_desc_ != nullptr);
+  // Def and rep levels should be in valid range.
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (def_level_ >= max_def_level()) {
+    bool* slot = tuple->GetBoolSlot(tuple_offset_);
+    if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false;
+  } else {
+    // Null value
+    tuple->SetNull(null_indicator_offset_);
+  }
+  return NextLevels<IN_COLLECTION>();
+}
+
+template <bool IN_COLLECTION>
+bool BoolColumnReader::DecodeValue(bool* value) {
+  if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
+    *value = unpacked_values_[unpacked_value_idx_++];
+  } else {
+    // Unpack as many values as we can into the buffer. We expect to read at least one
+    // value.
+    if (page_encoding_ == parquet::Encoding::PLAIN) {
+      num_unpacked_values_ =
+          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+    } else {
+      num_unpacked_values_ =
+          rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+    }
+
+    if (UNLIKELY(num_unpacked_values_ == 0)) {
+      parent_->parse_status_ = Status("Invalid bool column.");
+      return false;
+    }
+    *value = unpacked_values_[0];
+    unpacked_value_idx_ = 1;
+  }
+  return true;
+}
+
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
 // path doesn't falsely report that the file is corrupted.
 bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
@@ -789,7 +860,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
       continue;
     }
     // Fill in position slot if applicable
-    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
+    if (pos_slot_desc_ != nullptr) {
+      ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    }
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
@@ -821,17 +894,14 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
   return continue_execution;
 }
 
-void ParquetColumnReader::ReadPosition(Tuple* tuple) {
-  DCHECK(pos_slot_desc() != NULL);
+void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
   // NextLevels() should have already been called
   DCHECK_GE(rep_level_, 0);
   DCHECK_GE(def_level_, 0);
   DCHECK_GE(pos_current_value_, 0);
   DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
       "Caller should have called NextLevels() until we are ready to read a value";
-
-  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
+  *pos = pos_current_value_++;
 }
 
 // In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
@@ -1055,8 +1125,8 @@ Status BaseScalarColumnReader::InitDictionary() {
     }
   }
   if (dict_header != nullptr &&
-      dict_header->encoding != parquet::Encoding::PLAIN &&
-      dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+      dict_header->encoding != Encoding::PLAIN &&
+      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
     return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
                   "for dictionary pages.");
   }
@@ -1210,7 +1280,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
     num_values_read_ += num_buffered_values_;
 
     int uncompressed_size = current_page_header_.uncompressed_page_size;
-    if (decompressor_.get() != NULL) {
+    if (decompressor_.get() != nullptr) {
       SCOPED_TIMER(parent_->decompress_timer_);
       uint8_t* decompressed_buffer;
       RETURN_IF_ERROR(AllocateUncompressedDataPage(
@@ -1281,7 +1351,7 @@ Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
   return Status::OK();
 }
 
-template<bool ADVANCE_REP_LEVEL>
+template <bool ADVANCE_REP_LEVEL>
 bool BaseScalarColumnReader::NextLevels() {
   if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
 
@@ -1369,7 +1439,7 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
   if (tuple_offset_ == -1) {
     return CollectionColumnReader::NextLevels();
   } else if (def_level_ >= max_def_level()) {
-    return ReadSlot(tuple, pool);
+    return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
   } else {
     // Null value
     tuple->SetNull(null_indicator_offset_);
@@ -1377,21 +1447,18 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
   }
 }
 
-bool CollectionColumnReader::ReadNonRepeatedValue(
-    MemPool* pool, Tuple* tuple) {
+bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
   return CollectionColumnReader::ReadValue(pool, tuple);
 }
 
-bool CollectionColumnReader::ReadSlot(Tuple* tuple, MemPool* pool) {
+bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
   DCHECK(!children_.empty());
   DCHECK_LE(rep_level_, new_collection_rep_level());
 
   // Recursively read the collection into a new CollectionValue.
-  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(
-      tuple->GetSlot(tuple_offset_));
-  *coll_slot = CollectionValue();
+  *slot = CollectionValue();
   CollectionValueBuilder builder(
-      coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+      slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
   bool continue_execution = parent_->AssembleCollection(
       children_, new_collection_rep_level(), &builder);
   if (!continue_execution) return false;
@@ -1470,11 +1537,11 @@ static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
 ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
     bool is_collection_field, const SlotDescriptor* slot_desc,
     HdfsParquetScanner* parent) {
-  ParquetColumnReader* reader = NULL;
+  ParquetColumnReader* reader = nullptr;
   if (is_collection_field) {
     // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
     reader = new CollectionColumnReader(parent, node, slot_desc);
-  } else if (slot_desc != NULL) {
+  } else if (slot_desc != nullptr) {
     // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
     switch (slot_desc->type().type) {
       case TYPE_BOOLEAN:

http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index b19f23e..022a868 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -228,10 +228,12 @@ class ParquetColumnReader {
   /// true.
   virtual bool NextLevels() = 0;
 
-  /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to
-  /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into
-  /// 'tuple') and increments pos_current_value_.
-  void ReadPosition(Tuple* tuple);
+  /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the
+  /// parent collection) to 'pos' and increments pos_current_value_. Only valid to
+  /// call when doing non-batched reading, i.e. NextLevels() must have been called
+  /// before each call to this function to advance to the next element in the
+  /// collection.
+  void ReadPositionNonBatched(int64_t* pos);
 
   /// Returns true if this column reader has reached the end of the row group.
   inline bool RowGroupAtEnd() {
@@ -247,7 +249,7 @@ class ParquetColumnReader {
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
-  const SlotDescriptor* slot_desc_;
+  const SlotDescriptor* const slot_desc_;
 
   /// The slot descriptor for the position field of the tuple, if there is one. NULL if
   /// there's not. Only one column reader for a given tuple desc will have this set.
@@ -268,17 +270,17 @@ class ParquetColumnReader {
   /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
   /// are accessed in inner loops.
   int16_t rep_level_;
-  int16_t max_rep_level_;
+  const int16_t max_rep_level_;
   int16_t def_level_;
-  int16_t max_def_level_;
+  const int16_t max_def_level_;
 
   // Cache frequently accessed members of slot_desc_ for perf.
 
   /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
-  int tuple_offset_;
+  const int tuple_offset_;
 
   /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
-  NullIndicatorOffset null_indicator_offset_;
+  const NullIndicatorOffset null_indicator_offset_;
 
   ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
@@ -575,13 +577,12 @@ class CollectionColumnReader : public ParquetColumnReader {
   void UpdateDerivedState();
 
   /// Recursively reads from children_ to assemble a single CollectionValue into
-  /// the appropriate destination slot in 'tuple'. Also advances rep_level_ and
-  /// def_level_ via NextLevels().
+  /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels().
   ///
   /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
   /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
   /// true.
-  inline bool ReadSlot(Tuple* tuple, MemPool* pool);
+  inline bool ReadSlot(CollectionValue* slot, MemPool* pool);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index ae9be32..68f313d 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -243,6 +243,14 @@ class Tuple {
     return reinterpret_cast<const char*>(this) + offset;
   }
 
+  bool* GetBoolSlot(int offset) {
+    return static_cast<bool*>(GetSlot(offset));
+  }
+
+  int64_t* GetBigIntSlot(int offset) {
+    return static_cast<int64_t*>(GetSlot(offset));
+  }
+
   StringValue* GetStringSlot(int offset) {
     DCHECK(offset != -1);  // -1 offset indicates non-materialized slot
     return reinterpret_cast<StringValue*>(reinterpret_cast<char*>(this) + offset);

http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index bac127a..e527dc8 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -114,6 +114,8 @@ class BatchedBitReader {
   /// Resets the read to start reading from the start of 'buffer'. The buffer's
   /// length is 'buffer_len'. Does not take ownership of the buffer.
   void Reset(const uint8_t* buffer, int64_t buffer_len) {
+    DCHECK(buffer != nullptr);
+    DCHECK_GE(buffer_len, 0);
     buffer_pos_ = buffer;
     buffer_end_ = buffer + buffer_len;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/51bc004d/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 83b6315..1491066 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -471,6 +471,8 @@ inline void RleEncoder::Clear() {
 
 template <typename T>
 inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
   DCHECK_GE(bit_width, 0);
   DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
   bit_reader_.Reset(buffer, buffer_len);