You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/27 02:01:54 UTC

[04/14] impala git commit: IMPALA-7869: break up parquet-column-readers.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
new file mode 100644
index 0000000..fce14b1
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -0,0 +1,1605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet-column-readers.h"
+
+#include <sstream>
+#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+#include "exec/parquet/parquet-bool-decoder.h"
+#include "exec/parquet/parquet-level-decoder.h"
+#include "exec/parquet/parquet-metadata-utils.h"
+#include "exec/parquet/parquet-scratch-tuple-batch.h"
+#include "exec/read-write-util.h"
+#include "exec/scanner-context.inline.h"
+#include "parquet-collection-column-reader.h"
+#include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
+#include "runtime/mem-pool.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "util/bit-util.h"
+#include "util/codec.h"
+#include "util/debug-util.h"
+#include "util/dict-encoding.h"
+#include "util/rle-encoding.h"
+
+#include "common/names.h"
+
+// Provide a workaround for IMPALA-1658.
+DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
+    "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
+    "be converted from UTC to local time. Writes are unaffected.");
+
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
+// Max data page header size in bytes. This is an estimate and only needs to be an upper
+// bound. It is theoretically possible to have a page header of any size due to string
+// value statistics, but in practice we'll have trouble reading string values this large.
+// Also, this limit is in place to prevent impala from reading corrupt parquet files.
+DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
+
+using namespace impala::io;
+
+using parquet::Encoding;
+
+namespace impala {
+
+const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
+    "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
+
+// Definition of variable declared in header for use of the
+// SHOULD_TRIGGER_COL_READER_DEBUG_ACTION macro.
+int parquet_column_reader_debug_count = 0;
+
+/// Per column type reader. InternalType is the datatype that Impala uses internally to
+/// store tuple data and PARQUET_TYPE is the corresponding primitive datatype (as defined
+/// in the parquet spec) that is used to store column values in parquet files.
+/// If MATERIALIZED is true, the column values are materialized into the slot described
+/// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
+/// the position can be accessed.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+class ScalarColumnReader : public BaseScalarColumnReader {
+ public:
+  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc);
+  virtual ~ScalarColumnReader() { }
+
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<true>(tuple);
+  }
+
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
+    return ReadValue<false>(tuple);
+  }
+
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
+  }
+
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) override {
+    return ReadValueBatch<false>(max_values, tuple_size, tuple_mem, num_values);
+  }
+
+  virtual DictDecoderBase* GetDictionaryDecoder() override {
+    return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
+  }
+
+  virtual bool NeedsConversion() override { return NeedsConversionInline(); }
+  virtual bool NeedsValidation() override { return NeedsValidationInline(); }
+
+ protected:
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(Tuple* tuple);
+
+  /// Implementation of the ReadValueBatch() functions specialized for this
+  /// column reader type. This function drives the reading of data pages and
+  /// caching of rep/def levels. Once a data page and cached levels are available,
+  /// it calls into a more specialized MaterializeValueBatch() for doing the actual
+  /// value materialization using the level caches.
+  /// Use RESTRICT so that the compiler knows that it is safe to cache member
+  /// variables in registers or on the stack (otherwise gcc's alias analysis
+  /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
+  /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
+  /// -fno-strict-alias).
+  template <bool IN_COLLECTION>
+  bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
+
+  /// Helper function for ReadValueBatch() above that performs value materialization.
+  /// It assumes a data page with remaining values is available, and that the def/rep
+  /// level caches have been populated. Materializes values into 'tuple_mem' with a
+  /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the number of
+  /// values materialized in 'num_values'.
+  /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
+  /// handled in this function.
+  /// Use RESTRICT so that the compiler knows that it is safe to cache member
+  /// variables in registers or on the stack (otherwise gcc's alias analysis
+  /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
+  /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
+  /// -fno-strict-alias).
+  template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
+
+  /// Same as above, but dispatches to the appropriate templated implementation of
+  /// MaterializeValueBatch() based on 'page_encoding_' and NeedsConversionInline().
+  template <bool IN_COLLECTION>
+  bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+      int* RESTRICT num_values) RESTRICT;
+
+  /// Fast path for MaterializeValueBatch() that materializes values for a run of
+  /// repeated definition levels. Read up to 'max_values' values into 'tuple_mem',
+  /// returning the number of values materialised in 'num_values'.
+  bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+      uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'.
+  bool ReadSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is needed.
+  bool ReadAndConvertSlots(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem', when
+  /// conversion is not needed.
+  bool ReadSlotsNoConversion(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Read 'num_to_read' position values into a batch of tuples starting at 'tuple_mem'.
+  void ReadPositions(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  virtual Status CreateDictionaryDecoder(
+      uint8_t* values, int size, DictDecoderBase** decoder) override {
+    DCHECK(slot_desc_->type().type != TYPE_BOOLEAN)
+        << "Dictionary encoding is not supported for bools. Should never have gotten "
+        << "this far.";
+    if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
+      return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+          slot_desc_->type().DebugString(), "could not decode dictionary");
+    }
+    dict_decoder_init_ = true;
+    *decoder = &dict_decoder_;
+    return Status::OK();
+  }
+
+  virtual bool HasDictionaryDecoder() override {
+    return dict_decoder_init_;
+  }
+
+  virtual void ClearDictionaryDecoder() override {
+    dict_decoder_init_ = false;
+  }
+
+  virtual Status InitDataPage(uint8_t* data, int size) override;
+
+ private:
+  /// Writes the next value into the appropriate destination slot in 'tuple'. Returns
+  /// false if execution should be aborted for some reason, e.g. parse_error_ is set, the
+  /// query is cancelled, or the scan node limit was reached. Otherwise returns true.
+  ///
+  /// Force inlining - GCC does not always inline this into hot loops.
+  template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+  inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
+
+  /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
+  /// This helper is only used on the batched decoding path where we need to reset
+  /// 'pos_current_value_' to 0 based on 'rep_level'.
+  inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos);
+
+  /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte
+  /// past the end of the buffer. Return false and set 'parse_error_' if there is an
+  /// error decoding the value.
+  template <Encoding::type ENCODING>
+  inline ALWAYS_INLINE bool DecodeValue(
+      uint8_t** data, const uint8_t* data_end, InternalType* RESTRICT val) RESTRICT;
+
+  /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes. Return
+  /// false and set 'parse_error_' if there is an error decoding any value.
+  inline ALWAYS_INLINE bool DecodeValues(
+      int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT;
+  /// Specialisation of DecodeValues for a particular encoding, to allow overriding
+  /// specific instances via template specialisation.
+  template <Encoding::type ENCODING>
+  inline ALWAYS_INLINE bool DecodeValues(
+      int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT;
+
+  /// Most column readers never require conversion, so we can avoid branches by
+  /// returning constant false. Column readers for types that require conversion
+  /// must specialize this function.
+  inline bool NeedsConversionInline() const {
+    DCHECK(!needs_conversion_);
+    return false;
+  }
+
+  /// Similar to NeedsConversion(), most column readers do not require validation,
+  /// so to avoid branches, we return constant false. In general, types where not
+  /// all possible bit representations of the data type are valid should be
+  /// validated.
+  inline bool NeedsValidationInline() const {
+    return false;
+  }
+
+  /// Converts and writes 'src' into 'slot' based on desc_->type()
+  bool ConvertSlot(const InternalType* src, void* slot) {
+    DCHECK(false);
+    return false;
+  }
+
+  /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
+  /// is invalid, logs the error and returns false. If the error should stop execution,
+  /// sets 'parent_->parse_status_'.
+  bool ValidateValue(InternalType* val) const {
+    DCHECK(false);
+    return false;
+  }
+
+  /// Pull out slow-path Status construction code
+  void __attribute__((noinline)) SetDictDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+
+  void __attribute__((noinline)) SetPlainDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+
+  void __attribute__((noinline)) SetBoolDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_BOOL_VALUE, filename(),
+        PrintThriftEnum(page_encoding_), stream_->file_offset());
+  }
+
+
+  /// Dictionary decoder for decoding column values.
+  DictDecoder<InternalType> dict_decoder_;
+
+  /// True if dict_decoder_ has been initialized with a dictionary page.
+  bool dict_decoder_init_ = false;
+
+  /// true if decoded values must be converted before being written to an output tuple.
+  bool needs_conversion_ = false;
+
+  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
+  /// the max length for VARCHAR columns. Unused otherwise.
+  int fixed_len_size_;
+
+  /// Contains extra data needed for Timestamp decoding.
+  ParquetTimestampDecoder timestamp_decoder_;
+
+  /// Contains extra state required to decode boolean values. Only initialised for
+  /// BOOLEAN columns.
+  unique_ptr<ParquetBoolDecoder> bool_decoder_;
+
+  /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null otherwise.
+  uint8_t* conversion_buffer_ = nullptr;
+};
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
+    HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
+  : BaseScalarColumnReader(parent, node, slot_desc),
+    dict_decoder_(parent->scan_node_->mem_tracker()) {
+  if (!MATERIALIZED) {
+    // We're not materializing any values, just counting them. No need (or ability) to
+    // initialize state used to materialize values.
+    DCHECK(slot_desc_ == nullptr);
+    return;
+  }
+
+  DCHECK(slot_desc_ != nullptr);
+  if (slot_desc_->type().type == TYPE_DECIMAL
+      && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    fixed_len_size_ = node.element->type_length;
+  } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+    fixed_len_size_ = slot_desc_->type().len;
+  } else {
+    fixed_len_size_ = -1;
+  }
+
+  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
+
+  if (slot_desc_->type().type == TYPE_TIMESTAMP) {
+    timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
+    dict_decoder_.SetTimestampHelper(timestamp_decoder_);
+    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+  }
+  if (slot_desc_->type().type == TYPE_BOOLEAN) {
+    bool_decoder_ = make_unique<ParquetBoolDecoder>();
+  }
+}
+
+// TODO: consider performing filter selectivity checks in this function.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
+    uint8_t* data, int size) {
+  // Data can be empty if the column contains all NULLs
+  DCHECK_GE(size, 0);
+  DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
+      << "Bool has specialized impl";
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+  if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
+      && page_encoding_ != parquet::Encoding::PLAIN) {
+    return GetUnsupportedDecodingError();
+  }
+
+  // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does
+  // not need to be initialized.
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
+    if (!dict_decoder_init_) {
+      return Status("File corrupt. Missing dictionary page.");
+    }
+    RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
+  }
+  // Allocate a temporary buffer to hold InternalType values if we need to convert
+  // before writing to the final slot.
+  if (NeedsConversionInline() && conversion_buffer_ == nullptr) {
+    int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size();
+    conversion_buffer_ =
+        parent_->perm_pool_->TryAllocateAligned(buffer_size, alignof(InternalType));
+    if (conversion_buffer_ == nullptr) {
+      return parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_,
+          "Failed to allocate conversion buffer in Parquet scanner", buffer_size);
+    }
+  }
+  return Status::OK();
+}
+
+template <>
+Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataPage(
+    uint8_t* data, int size) {
+  // Data can be empty if the column contains all NULLs
+  DCHECK_GE(size, 0);
+  page_encoding_ = current_page_header_.data_page_header.encoding;
+
+  /// Boolean decoding is delegated to 'bool_decoder_'.
+  if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK();
+  return GetUnsupportedDecodingError();
+}
+
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
+    Tuple* tuple) {
+  // NextLevels() should have already been called and def and rep levels should be in
+  // valid range.
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (MATERIALIZED) {
+    if (def_level_ >= max_def_level()) {
+      bool continue_execution;
+      if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
+            ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
+      } else {
+        DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+        continue_execution = NeedsConversionInline() ?
+            ReadSlot<Encoding::PLAIN, true>(tuple) :
+            ReadSlot<Encoding::PLAIN, false>(tuple);
+      }
+      if (!continue_execution) return false;
+    } else {
+      tuple->SetNull(null_indicator_offset_);
+    }
+  }
+  return NextLevels<IN_COLLECTION>();
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Repetition level is only present if this column is nested in a collection type.
+  if (IN_COLLECTION) {
+    DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
+  } else {
+    DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+  }
+
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    DCHECK_GE(num_buffered_values_, 0);
+    // Read next page if necessary.
+    if (num_buffered_values_ == 0) {
+      if (!NextPage()) {
+        continue_execution = parent_->parse_status_.ok();
+        continue;
+      }
+    }
+
+    // Not materializing anything - skip decoding any levels and rely on the value
+    // count from page metadata to return the correct number of rows.
+    if (!MATERIALIZED && !IN_COLLECTION) {
+      int vals_to_add = min(num_buffered_values_, max_values - val_count);
+      val_count += vals_to_add;
+      num_buffered_values_ -= vals_to_add;
+      DCHECK_GE(num_buffered_values_, 0);
+      continue;
+    }
+    // Fill the rep level cache if needed. We are flattening out the fields of the
+    // nested collection into the top-level tuple returned by the scan, so we don't
+    // care about the nesting structure unless the position slot is being populated.
+    if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+      parent_->parse_status_.MergeStatus(
+          rep_levels_.CacheNextBatch(num_buffered_values_));
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    }
+
+    const int remaining_val_capacity = max_values - val_count;
+    uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+    if (def_levels_.NextRepeatedRunLength() > 0) {
+      // Fast path to materialize a run of values with the same definition level. This
+      // avoids checking for NULL/not-NULL for every value.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatchRepeatedDefLevel(
+          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    } else {
+      // We don't have a repeated run - cache def levels and process value-by-value.
+      if (!def_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(
+            def_levels_.CacheNextBatch(num_buffered_values_));
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int ret_val_count = 0;
+      continue_execution = MaterializeValueBatch<IN_COLLECTION>(
+          remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      val_count += ret_val_count;
+    }
+    if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
+      continue_execution &= ColReaderDebugAction(&val_count);
+    }
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  DCHECK(MATERIALIZED || IN_COLLECTION);
+  DCHECK_GT(num_buffered_values_, 0);
+  DCHECK(def_levels_.CacheHasNext());
+  if (IN_COLLECTION && pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  const int cache_start_idx = def_levels_.CacheCurrIdx();
+  uint8_t* curr_tuple = tuple_mem;
+  int val_count = 0;
+  DCHECK_LE(def_levels_.CacheRemaining(), num_buffered_values_);
+  max_values = min(max_values, num_buffered_values_);
+  while (def_levels_.CacheHasNext() && val_count < max_values) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    int def_level = def_levels_.CacheGetNext();
+
+    if (IN_COLLECTION) {
+      if (def_level < def_level_of_immediate_repeated_ancestor()) {
+        // A containing repeated field is empty or NULL. Skip the value but
+        // move to the next repetition level if necessary.
+        if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
+        continue;
+      }
+      if (pos_slot_desc_ != nullptr) {
+        ReadPositionBatched(rep_levels_.CacheGetNext(),
+            tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
+      }
+    }
+
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
+        if (UNLIKELY(!continue_execution)) return false;
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+    curr_tuple += tuple_size;
+    ++val_count;
+  }
+  num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+  DCHECK_GE(num_buffered_values_, 0);
+  *num_values = val_count;
+  return true;
+}
+
+// Note that the structure of this function is very similar to MaterializeValueBatch()
+// above, except it is unrolled to operate on multiple values at a time.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE,
+    MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
+    uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
+  DCHECK_GT(num_buffered_values_, 0);
+  if (pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
+  int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
+  DCHECK_GT(def_level_repeats, 0);
+  // Peek at the def level. The number of def levels we'll consume depends on several
+  // conditions below.
+  uint8_t def_level = def_levels_.GetRepeatedValue(0);
+  int32_t num_def_levels_to_consume = 0;
+
+  if (def_level < def_level_of_immediate_repeated_ancestor()) {
+    DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
+    // A containing repeated field is empty or NULL. We don't need to return any values
+    // but need to advance any rep levels.
+    if (pos_slot_desc_ != nullptr) {
+      num_def_levels_to_consume =
+          min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
+      rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
+    } else {
+      num_def_levels_to_consume = def_level_repeats;
+    }
+    *num_values = 0;
+  } else {
+    // Cannot consume more levels than allowed by buffered input values and output space.
+    num_def_levels_to_consume =
+        min(num_buffered_values_, min(max_values, def_level_repeats));
+    if (pos_slot_desc_ != nullptr) {
+      num_def_levels_to_consume =
+          min<uint32_t>(num_def_levels_to_consume, rep_levels_.CacheRemaining());
+      ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
+    }
+    if (MATERIALIZED) {
+      if (def_level >= max_def_level()) {
+        if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) {
+          return false;
+        }
+      } else {
+        Tuple::SetNullIndicators(
+            null_indicator_offset_, num_def_levels_to_consume, tuple_size, tuple_mem);
+      }
+    }
+    *num_values = num_def_levels_to_consume;
+  }
+  // We now know how many we actually consumed.
+  def_levels_.GetRepeatedValue(num_def_levels_to_consume);
+  num_buffered_values_ -= num_def_levels_to_consume;
+  DCHECK_GE(num_buffered_values_, 0);
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <bool IN_COLLECTION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
+    int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
+    int* RESTRICT num_values) RESTRICT {
+  // Dispatch to the correct templated implementation of MaterializeValueBatch().
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    if (NeedsConversionInline()) {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
+          max_values, tuple_size, tuple_mem, num_values);
+    } else {
+      return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
+          max_values, tuple_size, tuple_mem, num_values);
+    }
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
+    Tuple* RESTRICT tuple) RESTRICT {
+  void* slot = tuple->GetSlot(tuple_offset_);
+  // Use an uninitialized stack allocation for temporary value to avoid running
+  // constructors doing work unnecessarily, e.g. if T == StringValue.
+  alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+  InternalType* val_ptr =
+      reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
+
+  if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
+  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+    if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+    // The value is invalid but execution should continue - set the null indicator and
+    // skip conversion.
+    tuple->SetNull(null_indicator_offset_);
+    return true;
+  }
+  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  if (NeedsConversionInline()) {
+    return ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem);
+  } else {
+    return ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem);
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConvertSlots(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(NeedsConversionInline());
+  DCHECK(conversion_buffer_ != nullptr);
+  InternalType* first_val = reinterpret_cast<InternalType*>(conversion_buffer_);
+  // Decode into the conversion buffer before doing the conversion into the output tuples.
+  if (!DecodeValues(sizeof(InternalType), num_to_read, first_val)) return false;
+
+  InternalType* curr_val = first_val;
+  uint8_t* curr_tuple = tuple_mem;
+  for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+    if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+      // The value is invalid but execution should continue - set the null indicator and
+      // skip conversion.
+      tuple->SetNull(null_indicator_offset_);
+      continue;
+    }
+    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
+      return false;
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlotsNoConversion(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(!NeedsConversionInline());
+  // No conversion needed - decode directly into the output slots.
+  InternalType* first_slot = reinterpret_cast<InternalType*>(tuple_mem + tuple_offset_);
+  if (!DecodeValues(tuple_size, num_to_read, first_slot)) return false;
+  if (NeedsValidationInline()) {
+    // Validate the written slots.
+    uint8_t* curr_tuple = tuple_mem;
+    for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      InternalType* val = static_cast<InternalType*>(tuple->GetSlot(tuple_offset_));
+      if (UNLIKELY(!ValidateValue(val))) {
+        if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+        // The value is invalid but execution should continue - set the null indicator and
+        // skip conversion.
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
+    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+    InternalType* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, ENCODING);
+  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(ENCODING, Encoding::PLAIN);
+    int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        *data, data_end, fixed_len_size_, val);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    *data += encoded_len;
+  }
+  return true;
+}
+
+// Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
+// out to 'timestamp_decoder_'.
+template <>
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
+    true>::DecodeValue<Encoding::PLAIN>(uint8_t** RESTRICT data,
+    const uint8_t* RESTRICT data_end, TimestampValue* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+  int encoded_len = timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
+  if (UNLIKELY(encoded_len < 0)) {
+    SetPlainDecodeError();
+    return false;
+  }
+  *data += encoded_len;
+  return true;
+}
+
+template <>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValue(
+    uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
+    bool* RESTRICT value) RESTRICT {
+  if (UNLIKELY(!bool_decoder_->DecodeValue<ENCODING>(value))) {
+    SetBoolDecodeError();
+    return false;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
+    int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    return DecodeValues<Encoding::PLAIN_DICTIONARY>(stride, count, out_vals);
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    return DecodeValues<Encoding::PLAIN>(stride, count, out_vals);
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+template <Encoding::type ENCODING>
+bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
+    int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
+  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) {
+      SetDictDecodeError();
+      return false;
+    }
+  } else {
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    int64_t encoded_len = ParquetPlainEncoder::DecodeBatch<InternalType, PARQUET_TYPE>(
+        data_, data_end_, fixed_len_size_, count, stride, out_vals);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    data_ += encoded_len;
+  }
+  return true;
+}
+
+// Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
+// out to 'timestamp_decoder_'.
+template <>
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
+    true>::DecodeValues<Encoding::PLAIN>(int64_t stride, int64_t count,
+    TimestampValue* RESTRICT out_vals) RESTRICT {
+  DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+  int64_t encoded_len = timestamp_decoder_.DecodeBatch<parquet::Type::INT64>(
+      data_, data_end_, count, stride, out_vals);
+  if (UNLIKELY(encoded_len < 0)) {
+    SetPlainDecodeError();
+    return false;
+  }
+  data_ += encoded_len;
+  return true;
+}
+
+template <>
+bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValues(
+    int64_t stride, int64_t count, bool* RESTRICT out_vals) RESTRICT {
+  if (!bool_decoder_->DecodeValues(stride, count, out_vals)) {
+    SetBoolDecodeError();
+    return false;
+  }
+  return true;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositionBatched(
+    int16_t rep_level, int64_t* pos) {
+  // Reset position counter if we are at the start of a new parent collection.
+  if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+  *pos = pos_current_value_++;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  const int pos_slot_offset = pos_slot_desc()->tuple_offset();
+  void* first_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetSlot(pos_slot_offset);
+  StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(first_slot), tuple_size};
+  for (int64_t i = 0; i < num_to_read; ++i) {
+    ReadPositionBatched(rep_levels_.CacheGetNext(), out.Advance());
+  }
+}
+
+template <>
+inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
+    true>::NeedsConversionInline() const {
+  return needs_conversion_;
+}
+
+template <>
+bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
+    const StringValue* src, void* slot) {
+  DCHECK(slot_desc() != nullptr);
+  DCHECK(slot_desc()->type().type == TYPE_CHAR);
+  int char_len = slot_desc()->type().len;
+  int unpadded_len = min(char_len, src->len);
+  char* dst_char = reinterpret_cast<char*>(slot);
+  memcpy(dst_char, src->ptr, unpadded_len);
+  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
+  return true;
+}
+
+template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsConversionInline() const {
+  return needs_conversion_;
+}
+
+template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
+::NeedsConversionInline() const {
+  return needs_conversion_;
+}
+
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
+    const TimestampValue* src, void* slot) {
+  // Conversion should only happen when this flag is enabled.
+  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+  DCHECK(timestamp_decoder_.NeedsConversion());
+  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
+  *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
+  timestamp_decoder_.ConvertToLocalTime(dst_ts);
+  return true;
+}
+
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot(
+    const TimestampValue* src, void* slot) {
+  DCHECK(timestamp_decoder_.NeedsConversion());
+  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
+  *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
+  timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
+  return true;
+}
+
+template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsValidationInline() const {
+  return true;
+}
+
+template <>
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
+::NeedsValidationInline() const {
+  return true;
+}
+
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
+    TimestampValue* val) const {
+  if (UNLIKELY(!TimestampValue::IsValidDate(val->date())
+      || !TimestampValue::IsValidTime(val->time()))) {
+    // If both are corrupt, invalid time takes precedence over invalid date, because
+    // invalid date may come from a more or less functional encoder that does not respect
+    // the 1400..9999 limit, while an invalid time is a good indicator of buggy encoder
+    // or memory garbage.
+    TErrorCode::type errorCode = TimestampValue::IsValidTime(val->time())
+        ? TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE
+        : TErrorCode::PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY;
+    ErrorMsg msg(errorCode, filename(), node_.element->name);
+    Status status = parent_->state_->LogOrReturnError(msg);
+    if (!status.ok()) parent_->parse_status_ = status;
+    return false;
+  }
+  return true;
+}
+
+template <>
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ValidateValue(
+    TimestampValue* val) const {
+  // The range was already checked during the int64_t->TimestampValue conversion, which
+  // sets the date to invalid if it was out of range.
+  if (UNLIKELY(!val->HasDate())) {
+    ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
+        filename(), node_.element->name);
+    Status status = parent_->state_->LogOrReturnError(msg);
+    if (!status.ok()) parent_->parse_status_ = status;
+    return false;
+  }
+  DCHECK(TimestampValue::IsValidDate(val->date()));
+  DCHECK(TimestampValue::IsValidTime(val->time()));
+  return true;
+}
+
+// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
+// if this matches those versions and compatibility workarounds need to be used.
+static bool RequiresSkippedDictionaryHeaderCheck(
+    const ParquetFileVersion& v) {
+  if (v.application != "impala") return false;
+  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
+}
+
+Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
+    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
+  // Ensure metadata is valid before using it to initialize the reader.
+  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
+      parent_->filename(), row_group_idx, col_idx(), schema_element(),
+      parent_->state_));
+  num_buffered_values_ = 0;
+  data_ = nullptr;
+  data_end_ = nullptr;
+  stream_ = nullptr;
+  io_reservation_ = 0;
+  metadata_ = &col_chunk.meta_data;
+  num_values_read_ = 0;
+  def_level_ = ParquetLevel::INVALID_LEVEL;
+  // See ColumnReader constructor.
+  rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL;
+  pos_current_value_ = ParquetLevel::INVALID_POS;
+
+  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+    RETURN_IF_ERROR(Codec::CreateDecompressor(
+        nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
+  }
+  int64_t col_start = col_chunk.meta_data.data_page_offset;
+  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+    // Already validated in ValidateColumnOffsets()
+    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+    col_start = col_chunk.meta_data.dictionary_page_offset;
+  }
+  int64_t col_len = col_chunk.meta_data.total_compressed_size;
+  if (col_len <= 0) {
+    return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+        filename(), col_len));
+  }
+  int64_t col_end = col_start + col_len;
+
+  // Already validated in ValidateColumnOffsets()
+  DCHECK_GT(col_end, 0);
+  DCHECK_LT(col_end, file_desc.file_length);
+  const ParquetFileVersion& file_version = parent_->file_version_;
+  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+    // dictionary page header size in total_compressed_size and total_uncompressed_size
+    // (see IMPALA-694). We pad col_len to compensate.
+    int64_t bytes_remaining = file_desc.file_length - col_end;
+    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+    col_len += pad;
+  }
+
+  // TODO: this will need to change when we have co-located files and the columns
+  // are different files.
+  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+    return Status(Substitute("Expected parquet column file path '$0' to match "
+        "filename '$1'", col_chunk.file_path, filename()));
+  }
+
+  const ScanRange* metadata_range = parent_->metadata_range_;
+  int64_t partition_id = parent_->context_->partition_descriptor()->id();
+  const ScanRange* split_range =
+      static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  // Determine if the column is completely contained within a local split.
+  bool col_range_local = split_range->expected_local()
+      && col_start >= split_range->offset()
+      && col_end <= split_range->offset() + split_range->len();
+  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+      filename(), col_len, col_start, partition_id, split_range->disk_id(),
+      col_range_local,
+      BufferOpts(split_range->try_cache(), file_desc.mtime));
+  ClearDictionaryDecoder();
+  return Status::OK();
+}
+
+void BaseScalarColumnReader::Close(RowBatch* row_batch) {
+  if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) {
+    row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false);
+  } else {
+    data_page_pool_->FreeAll();
+  }
+  if (decompressor_ != nullptr) decompressor_->Close();
+  DictDecoderBase* dict_decoder = GetDictionaryDecoder();
+  if (dict_decoder != nullptr) dict_decoder->Close();
+}
+
+Status BaseScalarColumnReader::StartScan() {
+  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  ScannerContext* context = parent_->context_;
+  DCHECK_GT(io_reservation_, 0);
+  bool needs_buffers;
+  RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
+      scan_range_, &needs_buffers));
+  if (needs_buffers) {
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+        context->bp_client(), scan_range_, io_reservation_));
+  }
+  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
+  DCHECK(stream_ != nullptr);
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::ReadPageHeader(bool peek,
+    parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
+  DCHECK(stream_ != nullptr);
+  *eos = false;
+
+  uint8_t* buffer;
+  int64_t buffer_size;
+  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+  // check for end of stream
+  if (buffer_size == 0) {
+    // The data pages contain fewer values than stated in the column metadata.
+    DCHECK(stream_->eosr());
+    DCHECK_LT(num_values_read_, metadata_->num_values);
+    // TODO for 2.3: node_.element->name isn't necessarily useful
+    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
+        num_values_read_, node_.element->name, filename());
+    RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+    *eos = true;
+    return Status::OK();
+  }
+
+  // We don't know the actual header size until the thrift object is deserialized.  Loop
+  // until we successfully deserialize the header or exceed the maximum header size.
+  uint32_t header_size;
+  Status status;
+  while (true) {
+    header_size = buffer_size;
+    status = DeserializeThriftMsg(buffer, &header_size, true, next_page_header);
+    if (status.ok()) break;
+
+    if (buffer_size >= FLAGS_max_page_header_size) {
+      stringstream ss;
+      ss << "ParquetScanner: could not read data page because page header exceeded "
+         << "maximum size of "
+         << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+      status.AddDetail(ss.str());
+      return status;
+    }
+
+    // Didn't read entire header, increase buffer size and try again
+    int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+    status = Status::OK();
+    bool success = stream_->GetBytes(
+        new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
+    if (!success) {
+      DCHECK(!status.ok());
+      return status;
+    }
+    DCHECK(status.ok());
+
+    // Even though we increased the allowed buffer size, the number of bytes
+    // read did not change. The header is not limited by the buffer space,
+    // so it must be incomplete in the file.
+    if (buffer_size == new_buffer_size) {
+      DCHECK_NE(new_buffer_size, 0);
+      return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+    }
+    DCHECK_GT(new_buffer_size, buffer_size);
+    buffer_size = new_buffer_size;
+  }
+
+  *next_header_size = header_size;
+
+  // Successfully deserialized current_page_header_
+  if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
+
+  int data_size = next_page_header->compressed_page_size;
+  if (UNLIKELY(data_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+        "column '$2'", filename(), data_size, schema_element().name));
+  }
+  int uncompressed_size = next_page_header->uncompressed_page_size;
+  if (UNLIKELY(uncompressed_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+        "size $1 for column '$2'", filename(), uncompressed_size,
+        schema_element().name));
+  }
+
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::InitDictionary() {
+  // Peek at the next page header
+  bool eos;
+  parquet::PageHeader next_page_header;
+  uint32_t next_header_size;
+  DCHECK(stream_ != nullptr);
+  DCHECK(!HasDictionaryDecoder());
+
+  RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
+                                 &next_header_size, &eos));
+  if (eos) return Status::OK();
+  // The dictionary must be the first data page, so if the first page
+  // is not a dictionary, then there is no dictionary.
+  if (next_page_header.type != parquet::PageType::DICTIONARY_PAGE) return Status::OK();
+
+  current_page_header_ = next_page_header;
+  Status status;
+  if (!stream_->SkipBytes(next_header_size, &status)) return status;
+
+  int data_size = current_page_header_.compressed_page_size;
+  if (slot_desc_ == nullptr) {
+    // Skip processing the dictionary page if we don't need to decode any values. In
+    // addition to being unnecessary, we are likely unable to successfully decode the
+    // dictionary values because we don't necessarily create the right type of scalar
+    // reader if there's no slot to read into (see CreateReader()).
+    if (!stream_->SkipBytes(data_size, &status)) return status;
+    return Status::OK();
+  }
+
+  if (node_.element->type == parquet::Type::BOOLEAN) {
+    return Status("Unexpected dictionary page. Dictionary page is not"
+       " supported for booleans.");
+  }
+
+  const parquet::DictionaryPageHeader* dict_header = nullptr;
+  if (current_page_header_.__isset.dictionary_page_header) {
+    dict_header = &current_page_header_.dictionary_page_header;
+  } else {
+    if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+      return Status("Dictionary page does not have dictionary header set.");
+    }
+  }
+  if (dict_header != nullptr &&
+      dict_header->encoding != Encoding::PLAIN &&
+      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
+    return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
+                  "for dictionary pages.");
+  }
+
+  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+  data_end_ = data_ + data_size;
+
+  // The size of dictionary can be 0, if every value is null. The dictionary still has to
+  // be reset in this case.
+  DictDecoderBase* dict_decoder;
+  if (current_page_header_.uncompressed_page_size == 0) {
+    return CreateDictionaryDecoder(nullptr, 0, &dict_decoder);
+  }
+
+  // There are 3 different cases from the aspect of memory management:
+  // 1. If the column type is string, the dictionary will contain pointers to a buffer,
+  //    so the buffer's lifetime must be as long as any row batch that references it.
+  // 2. If the column type is not string, and the dictionary page is compressed, then a
+  //    temporary buffer is needed for the uncompressed values.
+  // 3. If the column type is not string, and the dictionary page is not compressed,
+  //    then no buffer is necessary.
+  ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
+  uint8_t* dict_values = nullptr;
+  if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) {
+    int buffer_size = current_page_header_.uncompressed_page_size;
+    if (slot_desc_->type().IsStringType()) {
+      dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
+    } else if (uncompressed_buffer.TryAllocate(buffer_size)) {
+      dict_values = uncompressed_buffer.buffer(); // case 2
+    }
+    if (UNLIKELY(dict_values == nullptr)) {
+      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary",
+          buffer_size, "dictionary");
+      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+               parent_->state_, details, buffer_size);
+    }
+  } else {
+    dict_values = data_; // case 3.
+  }
+
+  if (decompressor_.get() != nullptr) {
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
+    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
+                    &uncompressed_size, &dict_values));
+    VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
+    if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+      return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+               "Expected $1 uncompressed bytes but got $2", filename(),
+               current_page_header_.uncompressed_page_size, uncompressed_size));
+    }
+  } else {
+    if (current_page_header_.uncompressed_page_size != data_size) {
+      return Status(Substitute("Error reading dictionary page in file '$0'. "
+                               "Expected $1 bytes but got $2", filename(),
+                               current_page_header_.uncompressed_page_size, data_size));
+    }
+    if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_, data_size);
+  }
+
+  RETURN_IF_ERROR(CreateDictionaryDecoder(
+      dict_values, current_page_header_.uncompressed_page_size, &dict_decoder));
+  if (dict_header != nullptr &&
+      dict_header->num_values != dict_decoder->num_entries()) {
+    return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+                  slot_desc_->type().DebugString(),
+                  Substitute("Expected $0 entries but data contained $1 entries",
+                             dict_header->num_values, dict_decoder->num_entries()));
+  }
+
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::InitDictionaries(
+    const vector<BaseScalarColumnReader*> readers) {
+  for (BaseScalarColumnReader* reader : readers) {
+    RETURN_IF_ERROR(reader->InitDictionary());
+  }
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::ReadDataPage() {
+  // We're about to move to the next data page.  The previous data page is
+  // now complete, free up any memory allocated for it. If the data page contained
+  // strings we need to attach it to the returned batch.
+  if (PageContainsTupleData(page_encoding_)) {
+    parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), false);
+  } else {
+    data_page_pool_->FreeAll();
+  }
+  // We don't hold any pointers to earlier pages in the stream - we can safely free
+  // any I/O or boundary buffer.
+  stream_->ReleaseCompletedResources(false);
+
+  // Read the next data page, skipping page types we don't care about.
+  // We break out of this loop on the non-error case (a data page was found or we read all
+  // the pages).
+  while (true) {
+    DCHECK_EQ(num_buffered_values_, 0);
+    if (num_values_read_ == metadata_->num_values) {
+      // No more pages to read
+      // TODO: should we check for stream_->eosr()?
+      break;
+    } else if (num_values_read_ > metadata_->num_values) {
+      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+          metadata_->num_values, num_values_read_, node_.element->name, filename());
+      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+      return Status::OK();
+    }
+
+    bool eos;
+    uint32_t header_size;
+    RETURN_IF_ERROR(ReadPageHeader(false /* peek */, &current_page_header_,
+                                   &header_size, &eos));
+    if (eos) return Status::OK();
+
+    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
+      // Any dictionary is already initialized, as InitDictionary has already
+      // been called. There are two possibilities:
+      // 1. The parquet file has two dictionary pages
+      // OR
+      // 2. The parquet file does not have the dictionary as the first data page.
+      // Both are errors in the parquet file.
+      if (HasDictionaryDecoder()) {
+        return Status(Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
+            "for column '$1'", filename(), schema_element().name));
+      } else {
+        return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
+            "column '$1' is not the first page", filename(), schema_element().name));
+      }
+    }
+
+    Status status;
+    int data_size = current_page_header_.compressed_page_size;
+    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
+      // We can safely skip non-data pages
+      if (!stream_->SkipBytes(data_size, &status)) {
+        DCHECK(!status.ok());
+        return status;
+      }
+      continue;
+    }
+
+    // Read Data Page
+    // TODO: when we start using page statistics, we will need to ignore certain corrupt
+    // statistics. See IMPALA-2208 and PARQUET-251.
+    if (!stream_->ReadBytes(data_size, &data_, &status)) {
+      DCHECK(!status.ok());
+      return status;
+    }
+    data_end_ = data_ + data_size;
+    int num_values = current_page_header_.data_page_header.num_values;
+    if (num_values < 0) {
+      return Status(Substitute("Error reading data page in Parquet file '$0'. "
+          "Invalid number of values in metadata: $1", filename(), num_values));
+    }
+    num_buffered_values_ = num_values;
+    num_values_read_ += num_buffered_values_;
+
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
+    if (decompressor_.get() != nullptr) {
+      SCOPED_TIMER(parent_->decompress_timer_);
+      uint8_t* decompressed_buffer;
+      RETURN_IF_ERROR(AllocateUncompressedDataPage(
+            uncompressed_size, "decompressed data", &decompressed_buffer));
+      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+          current_page_header_.compressed_page_size, data_, &uncompressed_size,
+          &decompressed_buffer));
+      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
+                << " to " << uncompressed_size;
+      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error decompressing data page in file '$0'. "
+            "Expected $1 uncompressed bytes but got $2", filename(),
+            current_page_header_.uncompressed_page_size, uncompressed_size));
+      }
+      data_ = decompressed_buffer;
+      data_size = current_page_header_.uncompressed_page_size;
+      data_end_ = data_ + data_size;
+    } else {
+      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
+      if (current_page_header_.compressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error reading data page in file '$0'. "
+            "Expected $1 bytes but got $2", filename(),
+            current_page_header_.compressed_page_size, uncompressed_size));
+      }
+      if (PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
+        // In this case returned batches will have pointers into the data page itself.
+        // We don't transfer disk I/O buffers out of the scanner so we need to copy
+        // the page data so that it can be attached to output batches.
+        uint8_t* copy_buffer;
+        RETURN_IF_ERROR(AllocateUncompressedDataPage(
+              uncompressed_size, "uncompressed variable-length data", &copy_buffer));
+        memcpy(copy_buffer, data_, uncompressed_size);
+        data_ = copy_buffer;
+        data_end_ = data_ + uncompressed_size;
+      }
+    }
+
+    // Initialize the repetition level data
+    RETURN_IF_ERROR(rep_levels_.Init(filename(),
+        current_page_header_.data_page_header.repetition_level_encoding,
+        parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
+        &data_size));
+
+    // Initialize the definition level data
+    RETURN_IF_ERROR(def_levels_.Init(filename(),
+        current_page_header_.data_page_header.definition_level_encoding,
+        parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
+        &data_size));
+
+    // Data can be empty if the column contains all NULLs
+    RETURN_IF_ERROR(InitDataPage(data_, data_size));
+    break;
+  }
+
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
+    const char* err_ctx, uint8_t** buffer) {
+  *buffer = data_page_pool_->TryAllocate(size);
+  if (*buffer == nullptr) {
+    string details =
+        Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
+    return data_page_pool_->mem_tracker()->MemLimitExceeded(
+        parent_->state_, details, size);
+  }
+  return Status::OK();
+}
+
+template <bool ADVANCE_REP_LEVEL>
+bool BaseScalarColumnReader::NextLevels() {
+  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+
+  if (UNLIKELY(num_buffered_values_ == 0)) {
+    if (!NextPage()) return parent_->parse_status_.ok();
+  }
+  --num_buffered_values_;
+  DCHECK_GE(num_buffered_values_, 0);
+
+  // Definition level is not present if column and any containing structs are required.
+  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
+  // The compiler can optimize these two conditions into a single branch by treating
+  // def_level_ as unsigned.
+  if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
+    SetLevelDecodeError("def", def_level_, max_def_level());
+    return false;
+  }
+
+  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
+    // Repetition level is only present if this column is nested in any collection type.
+    rep_level_ = rep_levels_.ReadLevel();
+    if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) {
+      SetLevelDecodeError("rep", rep_level_, max_rep_level());
+      return false;
+    }
+    // Reset position counter if we are at the start of a new parent collection.
+    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
+  }
+
+  return parent_->parse_status_.ok();
+}
+
+Status BaseScalarColumnReader::GetUnsupportedDecodingError() {
+  return Status(Substitute(
+      "File '$0' is corrupt: unexpected encoding: $1 for data page of column '$2'.",
+      filename(), PrintThriftEnum(page_encoding_), schema_element().name));
+}
+
+bool BaseScalarColumnReader::NextPage() {
+  parent_->assemble_rows_timer_.Stop();
+  parent_->parse_status_ = ReadDataPage();
+  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+  if (num_buffered_values_ == 0) {
+    rep_level_ = ParquetLevel::ROW_GROUP_END;
+    def_level_ = ParquetLevel::ROW_GROUP_END;
+    pos_current_value_ = ParquetLevel::INVALID_POS;
+    return false;
+  }
+  parent_->assemble_rows_timer_.Start();
+  return true;
+}
+
+void BaseScalarColumnReader::SetLevelDecodeError(
+    const char* level_name, int decoded_level, int max_level) {
+  if (decoded_level < 0) {
+    DCHECK_EQ(decoded_level, ParquetLevel::INVALID_LEVEL);
+    parent_->parse_status_.MergeStatus(
+        Status(Substitute("Corrupt Parquet file '$0': "
+                          "could not read all $1 levels for column '$2'",
+            filename(), level_name, schema_element().name)));
+  } else {
+    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
+        "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(),
+        level_name, decoded_level, max_level, schema_element().name)));
+  }
+}
+
+/// Returns a column reader for decimal types based on its size and parquet type.
+static ParquetColumnReader* CreateDecimalColumnReader(
+    const SchemaNode& node, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+  switch (node.element->type) {
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      }
+      break;
+    case parquet::Type::BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      }
+      break;
+    case parquet::Type::INT32:
+      DCHECK_EQ(sizeof(Decimal4Value::StorageType), slot_desc->type().GetByteSize());
+      return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>(
+          parent, node, slot_desc);
+    case parquet::Type::INT64:
+      DCHECK_EQ(sizeof(Decimal8Value::StorageType), slot_desc->type().GetByteSize());
+      return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>(
+          parent, node, slot_desc);
+    default:
+      DCHECK(false) << "Invalid decimal primitive type";
+  }
+  DCHECK(false) << "Invalid decimal type";
+  return nullptr;
+}
+
+ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
+    bool is_collection_field, const SlotDescriptor* slot_desc,
+    HdfsParquetScanner* parent) {
+  if (is_collection_field) {
+    // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
+    return new CollectionColumnReader(parent, node, slot_desc);
+  } else if (slot_desc != nullptr) {
+    // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
+    switch (slot_desc->type().type) {
+      case TYPE_BOOLEAN:
+        return new ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>(
+            parent, node, slot_desc);
+      case TYPE_TINYINT:
+        return new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(
+            parent, node, slot_desc);
+      case TYPE_SMALLINT:
+        return new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
+      case TYPE_INT:
+        return new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
+      case TYPE_BIGINT:
+        switch (node.element->type) {
+          case parquet::Type::INT32:
+            return new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent,
+                node, slot_desc);
+          default:
+            return new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent,
+                node, slot_desc);
+        }
+      case TYPE_FLOAT:
+        return new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
+            slot_desc);
+      case TYPE_DOUBLE:
+        switch (node.element->type) {
+          case parquet::Type::INT32:
+            return new ScalarColumnReader<double , parquet::Type::INT32, true>(parent,
+                node, slot_desc);
+          case parquet::Type::FLOAT:
+            return new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent,
+                node, slot_desc);
+          default:
+            return new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent,
+                node, slot_desc);
+        }
+      case TYPE_TIMESTAMP:
+        return CreateTimestampColumnReader(node, slot_desc, parent);
+      case TYPE_STRING:
+      case TYPE_VARCHAR:
+      case TYPE_CHAR:
+        return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case TYPE_DECIMAL:
+        return CreateDecimalColumnReader(node, slot_desc, parent);
+      default:
+        DCHECK(false) << slot_desc->type().DebugString();
+        return nullptr;
+    }
+  } else {
+    // Special case for counting scalar values (e.g. count(*), no materialized columns in
+    // the file, only materializing a position slot). We won't actually read any values,
+    // only the rep and def levels, so it doesn't matter what kind of reader we make.
+    return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
+        slot_desc);
+  }
+}
+
+ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader(
+    const SchemaNode& node, const SlotDescriptor* slot_desc,
+    HdfsParquetScanner* parent) {
+  if (node.element->type == parquet::Type::INT96) {
+    return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
+        parent, node, slot_desc);
+  }
+  else if (node.element->type == parquet::Type::INT64) {
+    return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>(
+        parent, node, slot_desc);
+  }
+  DCHECK(false) << slot_desc->type().DebugString();
+  return nullptr;
+}
+
+}