You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/09/26 17:57:49 UTC

[impala] branch master updated: IMPALA-6433: Part 1: Extract page reading logic from ParquetColumnReader

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b7c081c  IMPALA-6433: Part 1: Extract page reading logic from ParquetColumnReader
b7c081c is described below

commit b7c081cb9966166be8d20d6b75a541394a001ddc
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue May 14 12:00:39 2019 +0200

    IMPALA-6433: Part 1: Extract page reading logic from ParquetColumnReader
    
    Moved some responsibilities from parquet-column-readers.cc to
    new classes 'ParquetColumnChunkReader' and 'ParquetPageReader':
    - reading pages from ScanRange
    - decompress data if needed
    
    The main motivation is to make the implementation of V2 data page
    reading simpler by moving most parts that will differ between V1 and V2
    into a class with manageable complexity.
    
    Testing:
    - ran parquet related scanner tests
    
    Change-Id: Ic0289394adcb97a3529313030930c9c5b85aaa12
    Reviewed-on: http://gerrit.cloudera.org:8080/13329
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/parquet/CMakeLists.txt                 |   2 +
 be/src/exec/parquet/hdfs-parquet-scanner.h         |   3 +
 be/src/exec/parquet/parquet-column-chunk-reader.cc | 308 ++++++++++++
 be/src/exec/parquet/parquet-column-chunk-reader.h  | 172 +++++++
 be/src/exec/parquet/parquet-column-readers.cc      | 554 ++++-----------------
 be/src/exec/parquet/parquet-column-readers.h       |  64 +--
 be/src/exec/parquet/parquet-page-reader.cc         | 249 +++++++++
 be/src/exec/parquet/parquet-page-reader.h          | 126 +++++
 8 files changed, 973 insertions(+), 505 deletions(-)

diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt
index ec33f93..4589774 100644
--- a/be/src/exec/parquet/CMakeLists.txt
+++ b/be/src/exec/parquet/CMakeLists.txt
@@ -33,6 +33,8 @@ add_library(Parquet
   parquet-column-stats.cc
   parquet-level-decoder.cc
   parquet-metadata-utils.cc
+  parquet-column-chunk-reader.cc
+  parquet-page-reader.cc
   parquet-common.cc
   parquet-page-index.cc
 )
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index ae47cf8..8e5e7bc 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -48,6 +48,7 @@ class BaseScalarColumnReader;
 template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader;
 class BoolColumnReader;
+class ParquetPageReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
 /// the Impala in-memory representation of data, e.g.  (tuples, rows, row batches).
@@ -376,6 +377,8 @@ class HdfsParquetScanner : public HdfsScanner {
   friend class BoolColumnReader;
   friend class HdfsParquetScannerTest;
   friend class ParquetPageIndex;
+  friend class ParquetColumnChunkReader;
+  friend class ParquetPageReader;
 
   /// Index of the current row group being processed. Initialized to -1 which indicates
   /// that we have not started processing the first row group yet (GetNext() has not yet
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc b/be/src/exec/parquet/parquet-column-chunk-reader.cc
new file mode 100644
index 0000000..573fd04
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-column-chunk-reader.h"
+
+#include <string>
+
+#include "runtime/mem-pool.h"
+#include "runtime/runtime-state.h"
+#include "util/codec.h"
+
+#include "common/names.h"
+
+using namespace impala::io;
+
+using parquet::Encoding;
+
+namespace impala {
+
+const string PARQUET_PAGE_MEM_LIMIT_EXCEEDED =
+    "ParquetColumnChunkReader::$0() failed to allocate $1 bytes for $2.";
+
+// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
+// if this matches those versions and compatibility workarounds need to be used.
+static bool RequiresSkippedDictionaryHeaderCheck(
+    const ParquetFileVersion& v) {
+  if (v.application != "impala") return false;
+  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
+}
+
+ParquetColumnChunkReader::ParquetColumnChunkReader(HdfsParquetScanner* parent,
+    string schema_name, int slot_id, ValueMemoryType value_mem_type)
+  : parent_(parent),
+    schema_name_(schema_name),
+    page_reader_(parent, schema_name),
+    slot_id_(slot_id),
+    data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())),
+    value_mem_type_(value_mem_type)
+{
+}
+
+ParquetColumnChunkReader::~ParquetColumnChunkReader() {}
+
+Status ParquetColumnChunkReader::InitColumnChunk(const HdfsFileDesc& file_desc,
+    const parquet::ColumnChunk& col_chunk, int row_group_idx,
+    std::vector<io::ScanRange::SubRange>&& sub_ranges) {
+  if (col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
+    RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false,
+        ConvertParquetToImpalaCodec(col_chunk.meta_data.codec), &decompressor_));
+  }
+
+  RETURN_IF_ERROR(page_reader_.InitColumnChunk(file_desc, col_chunk,
+        row_group_idx, move(sub_ranges)));
+
+  return Status::OK();
+}
+
+void ParquetColumnChunkReader::Close(MemPool* mem_pool) {
+  if (mem_pool != nullptr && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+    mem_pool->AcquireData(data_page_pool_.get(), false);
+  } else {
+    data_page_pool_->FreeAll();
+  }
+
+  if (decompressor_ != nullptr) decompressor_->Close();
+}
+
+void ParquetColumnChunkReader::ReleaseResourcesOfLastPage(MemPool& mem_pool) {
+  if (value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+    mem_pool.AcquireData(data_page_pool_.get(), false);
+  } else {
+    data_page_pool_->FreeAll();
+  }
+  // We don't hold any pointers to earlier pages in the stream - we can safely free
+  // any I/O or boundary buffer.
+  // TODO: is this really needed? The read in ReadPageHeader() will release these
+  // resources anyway.
+  stream()->ReleaseCompletedResources(false);
+}
+
+Status ParquetColumnChunkReader::StartScan() {
+  DCHECK_GT(io_reservation_, 0);
+  RETURN_IF_ERROR(page_reader_.StartScan(io_reservation_));
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::SkipPageData() {
+  return page_reader_.SkipPageData();
+}
+
+Status ParquetColumnChunkReader::TryReadDictionaryPage(bool* is_dictionary_page,
+    bool* eos, bool skip_data, ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+    int64_t* data_size, int* num_entries) {
+  RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
+
+  *is_dictionary_page = CurrentPageHeader().__isset.dictionary_page_header;
+  if (*eos || !(*is_dictionary_page)) return Status::OK();
+
+  if (skip_data) return SkipPageData();
+
+  RETURN_IF_ERROR(ReadDictionaryData(uncompressed_buffer, dict_values,
+        data_size, num_entries));
+
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_buffer,
+    uint8_t** dict_values, int64_t* data_size, int* num_entries) {
+  const parquet::PageHeader& current_page_header = CurrentPageHeader();
+  const parquet::DictionaryPageHeader* dict_header = nullptr;
+  if (current_page_header.__isset.dictionary_page_header) {
+    dict_header = &current_page_header.dictionary_page_header;
+  } else {
+    if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+      return Status("Dictionary page does not have dictionary header set.");
+    }
+  }
+  if (dict_header != nullptr &&
+      dict_header->encoding != Encoding::PLAIN &&
+      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
+    return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
+                  "for dictionary pages.");
+  }
+
+  *data_size = current_page_header.compressed_page_size;
+  uint8_t* data = nullptr;
+  RETURN_IF_ERROR(page_reader_.ReadPageData(&data));
+
+  if (current_page_header.uncompressed_page_size == 0) {
+    // The size of dictionary can be 0, if every value is null.
+    *data_size = 0;
+    *num_entries = 0;
+    *dict_values = nullptr;
+
+    return Status::OK();
+  }
+
+  // There are 3 different cases from the aspect of memory management:
+  // 1. If the column type is string, the dictionary will contain pointers to a buffer,
+  //    so the buffer's lifetime must be as long as any row batch that references it.
+  // 2. If the column type is not string, and the dictionary page is compressed, then a
+  //    temporary buffer is needed for the uncompressed values.
+  // 3. If the column type is not string, and the dictionary page is not compressed,
+  //    then no buffer is necessary.
+  const bool copy_buffer = value_mem_type_ == ValueMemoryType::FIXED_LEN_STR
+      || value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
+
+  if (decompressor_.get() != nullptr || copy_buffer) {
+    int buffer_size = current_page_header.uncompressed_page_size;
+    if (copy_buffer) {
+      *dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
+    } else if (uncompressed_buffer->TryAllocate(buffer_size)) {
+      *dict_values = uncompressed_buffer->buffer(); // case 2
+    }
+    if (UNLIKELY(*dict_values == nullptr)) {
+      string details = Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "InitDictionary",
+          buffer_size, "dictionary");
+      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+               parent_->state_, details, buffer_size);
+    }
+  } else {
+    *dict_values = data; // case 3.
+  }
+
+  if (decompressor_.get() != nullptr) {
+    int uncompressed_size = current_page_header.uncompressed_page_size;
+    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, *data_size, data,
+                    &uncompressed_size, dict_values));
+    VLOG_FILE << "Decompressed " << *data_size << " to " << uncompressed_size;
+    if (current_page_header.uncompressed_page_size != uncompressed_size) {
+      return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+               "Expected $1 uncompressed bytes but got $2", filename(),
+               current_page_header.uncompressed_page_size, uncompressed_size));
+    }
+    *data_size = uncompressed_size;
+  } else {
+    if (current_page_header.uncompressed_page_size != *data_size) {
+      return Status(Substitute("Error reading dictionary page in file '$0'. "
+                               "Expected $1 bytes but got $2", filename(),
+                               current_page_header.uncompressed_page_size, *data_size));
+    }
+    if (copy_buffer) memcpy(*dict_values, data, *data_size);
+  }
+  *num_entries = dict_header->num_values;
+
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ReadNextDataPage(bool* eos, uint8_t** data,
+    int* data_size) {
+  // Read the next data page, skipping page types we don't care about. This method should
+  // be called after we know that the first page is not a dictionary page. Therefore, if
+  // we find a dictionary page, it is an error in the parquet file and we return a non-ok
+  // status (returned by page_reader_.ReadPageHeader()).
+  bool next_data_page_found = false;
+  while (!next_data_page_found) {
+    RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
+
+    const parquet::PageHeader current_page_header = CurrentPageHeader();
+    DCHECK(page_reader_.PageHeadersRead() > 0
+        || !current_page_header.__isset.dictionary_page_header)
+        << "Should not call this method on the first page if it is a dictionary.";
+
+    if (*eos) return Status::OK();
+
+    if (current_page_header.type == parquet::PageType::DATA_PAGE) {
+      next_data_page_found = true;
+    } else {
+      // We can safely skip non-data pages
+      RETURN_IF_ERROR(SkipPageData());
+    }
+  }
+
+  return ReadDataPageData(data, data_size);
+}
+
+Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size) {
+  const parquet::PageHeader& current_page_header = CurrentPageHeader();
+
+  int compressed_size = current_page_header.compressed_page_size;
+  int uncompressed_size = current_page_header.uncompressed_page_size;
+  uint8_t* compressed_data;
+
+  RETURN_IF_ERROR(page_reader_.ReadPageData(&compressed_data));
+
+  const bool has_slot_desc = value_mem_type_ != ValueMemoryType::NO_SLOT_DESC;
+
+  *data_size = uncompressed_size;
+  if (decompressor_.get() != nullptr) {
+    SCOPED_TIMER(parent_->decompress_timer_);
+    uint8_t* decompressed_buffer;
+    RETURN_IF_ERROR(AllocateUncompressedDataPage(
+        uncompressed_size, "decompressed data", &decompressed_buffer));
+    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+        compressed_size, compressed_data, &uncompressed_size,
+        &decompressed_buffer));
+    // TODO: can't we call stream_->ReleaseCompletedResources(false); at this point?
+    VLOG_FILE << "Decompressed " << current_page_header.compressed_page_size
+              << " to " << uncompressed_size;
+    if (current_page_header.uncompressed_page_size != uncompressed_size) {
+      return Status(Substitute("Error decompressing data page in file '$0'. "
+          "Expected $1 uncompressed bytes but got $2", filename(),
+          current_page_header.uncompressed_page_size, uncompressed_size));
+    }
+    *data = decompressed_buffer;
+
+    if (has_slot_desc) {
+      parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, compressed_size);
+      parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+      parent_->UpdateCompressedPageSizeCounter(compressed_size);
+    }
+  } else {
+    if (compressed_size != uncompressed_size) {
+      return Status(Substitute("Error reading data page in file '$0'. "
+          "Expected $1 bytes but got $2", filename(),
+          compressed_size, uncompressed_size));
+    }
+
+    const bool copy_buffer = value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
+
+    if (copy_buffer) {
+      // In this case returned batches will have pointers into the data page itself.
+      // We don't transfer disk I/O buffers out of the scanner so we need to copy
+      // the page data so that it can be attached to output batches.
+      uint8_t* buffer;
+      RETURN_IF_ERROR(AllocateUncompressedDataPage(
+          uncompressed_size, "uncompressed variable-length data", &buffer));
+      memcpy(buffer, compressed_data, uncompressed_size);
+      *data = buffer;
+    } else {
+      *data = compressed_data;
+    }
+    if (has_slot_desc) {
+      parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, 0);
+      parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::AllocateUncompressedDataPage(int64_t size,
+    const char* err_ctx, uint8_t** buffer) {
+  *buffer = data_page_pool_->TryAllocate(size);
+  if (*buffer == nullptr) {
+    string details =
+        Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
+    return data_page_pool_->mem_tracker()->MemLimitExceeded(
+        parent_->state_, details, size);
+  }
+  return Status::OK();
+}
+
+}
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.h b/be/src/exec/parquet/parquet-column-chunk-reader.h
new file mode 100644
index 0000000..2d685bb
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+#include "exec/parquet/parquet-page-reader.h"
+
+namespace impala {
+
+class MemPool;
+class Codec;
+
+/// A class to read data from Parquet pages. It handles the page headers, decompression
+/// and the possible copying of the data buffers.
+/// Before reading, InitColumnChunk(), set_io_reservation() and StartScan() must be called
+/// in this order.
+class ParquetColumnChunkReader {
+ public:
+
+  /// An enum containing information about the type and/or intended use of the values. It
+  /// is used to make decisions about memory management, for example when a buffer needs
+  /// to be copied.
+  /// In the future, more variants could be added if a new use case needs different memory
+  /// management.
+  enum class ValueMemoryType {
+    /// The values will not be read.
+    NO_SLOT_DESC,
+
+    /// Scalar (non-string) values.
+    SCALAR,
+    FIXED_LEN_STR,
+    VAR_LEN_STR
+  };
+
+  const char* filename() const { return parent_->filename(); }
+
+  const parquet::PageHeader& CurrentPageHeader() const {
+    return page_reader_.CurrentPageHeader();
+  }
+
+  io::ScanRange* scan_range() const { return page_reader_.scan_range(); }
+  parquet::PageType::type page_type() const { return CurrentPageHeader().type; }
+  ScannerContext::Stream* stream() const { return page_reader_.stream(); }
+
+  parquet::Encoding::type encoding() const {
+    return CurrentPageHeader().data_page_header.encoding;
+  }
+
+  /// Moved to implementation to be able to forward declare class in scoped_ptr.
+  ParquetColumnChunkReader(HdfsParquetScanner* parent, string schema_name, int slot_id,
+      ValueMemoryType value_mem_type);
+  ~ParquetColumnChunkReader();
+
+  /// Resets the reader for each row group in the file and creates the scan
+  /// range for the column, but does not start it. To start scanning,
+  /// set_io_reservation() must be called to assign reservation to this
+  /// column, followed by StartScan().
+  Status InitColumnChunk(
+      const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
+      int row_group_idx, std::vector<io::ScanRange::SubRange>&& sub_ranges);
+
+  void set_io_reservation(int bytes) {
+    io_reservation_ = bytes;
+  }
+
+  /// Starts the column scan range. InitColumnChunk() has to have been called and the
+  /// reader must have a reservation assigned via set_io_reservation(). This must be
+  /// called before any of the column data can be read (including dictionary and data
+  /// pages). Returns an error status if there was an error starting the scan or
+  /// allocating buffers for it.
+  Status StartScan();
+
+  /// If the column type is a variable length string and 'mem_pool' is not NULL, transfers
+  /// the remaining resources backing tuples to 'mem_pool' and frees up other resources.
+  /// Otherwise frees all resources.
+  void Close(MemPool* mem_pool);
+
+  /// The following functions can all advance stream_, which invalidates the buffer
+  /// returned by the previous call (unless copy_buffer was true).
+
+  /// Checks whether the next page is a dictionary page and if it is, reads the header and
+  /// either reads or skips the dictionary data, depending on 'skip_data'.
+  ///
+  /// After this method returns, the value of '*is_dictionary' can be used to determine
+  /// whether the page was a dictionary page.
+  /// If the data is read, '*dict_values' is set to point to the data and '*data_size' is
+  /// set to the length of the data. '*num_entries' is set to the number of elements. If
+  /// the column type is a string, then the buffer is allocated from the scanner's
+  /// dictionary_pool_ and will be valid as long as the scanner lives. Otherwise the
+  /// returned buffer will be valid only until the next function call that advances the
+  /// buffer.
+  /// 'uncompressed_buffer' is used to store data if a temporary buffer is needed for
+  /// decompression.
+  Status TryReadDictionaryPage(bool* is_dictionary_page, bool* eos, bool skip_data,
+      ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+      int64_t* data_size, int* num_entries);
+
+  /// Reads the next data page to '*data' and '*data_size'.
+  /// Skips other types of pages (except for dictionary) until it finds a data page. If it
+  /// finds a dictionary page, returns an error as the dictionary page should be the first
+  /// page and this method should only be called if a data page is expected.
+  /// If the stream reaches the end before reading a complete page header, '*eos' is set
+  /// to true.
+  Status ReadNextDataPage(bool* eos, uint8_t** data, int* data_size);
+
+  /// If the column type is a variable length string, transfers the remaining resources
+  /// backing tuples to 'mem_pool' and frees up other resources. Otherwise frees all
+  /// resources.
+  void ReleaseResourcesOfLastPage(MemPool& mem_pool);
+
+ private:
+  HdfsParquetScanner* parent_;
+  string schema_name_;
+
+  ParquetPageReader page_reader_;
+
+  /// Used to track reads in the scanners counters.
+  int slot_id_;
+
+  /// Pool to allocate storage for data pages from - either decompression buffers for
+  /// compressed data pages or copies of the data page with var-len data to attach to
+  /// batches.
+  boost::scoped_ptr<MemPool> data_page_pool_;
+
+  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
+  /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
+  /// by Reset().
+  int64_t io_reservation_ = 0;
+
+  boost::scoped_ptr<Codec> decompressor_;
+
+  /// See TryReadDictionaryPage() for information about the parameters.
+  Status ReadDictionaryData(ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+      int64_t* data_size, int* num_entries);
+
+  /// Reads the data part of the next data page. Sets '*data' to point to the buffer and
+  /// '*data_size' to its size.
+  /// If the column type is a variable length string, the buffer is allocated from
+  /// data_page_pool_. Otherwise the returned buffer will be valid only until the next
+  /// function call that advances the buffer.
+  Status ReadDataPageData(uint8_t** data, int* data_size);
+
+  /// Skips the data part of the page. The header must be already read.
+  Status SkipPageData();
+
+  /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
+  /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success,
+  /// 'buffer' points to the allocated memory. Otherwise an error status is returned.
+  Status AllocateUncompressedDataPage(
+      int64_t size, const char* err_ctx, uint8_t** buffer);
+
+  ValueMemoryType value_mem_type_;
+};
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index f044b86..ff237a9 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -17,34 +17,19 @@
 
 #include "parquet-column-readers.h"
 
-#include <sstream>
 #include <string>
-#include <boost/scoped_ptr.hpp>
-#include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
 #include "exec/parquet/hdfs-parquet-scanner.h"
 #include "exec/parquet/parquet-bool-decoder.h"
 #include "exec/parquet/parquet-level-decoder.h"
 #include "exec/parquet/parquet-metadata-utils.h"
-#include "exec/parquet/parquet-scratch-tuple-batch.h"
-#include "exec/read-write-util.h"
-#include "exec/scanner-context.inline.h"
 #include "parquet-collection-column-reader.h"
-#include "rpc/thrift-util.h"
-#include "runtime/exec-env.h"
-#include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
 #include "runtime/runtime-state.h"
-#include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
-#include "util/bit-util.h"
-#include "util/codec.h"
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
-#include "util/pretty-printer.h"
 #include "util/rle-encoding.h"
-#include "util/ubsan.h"
 
 #include "common/names.h"
 
@@ -53,25 +38,12 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
     "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
     "be converted from UTC to local time. Writes are unaffected.");
 
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
-// Max data page header size in bytes. This is an estimate and only needs to be an upper
-// bound. It is theoretically possible to have a page header of any size due to string
-// value statistics, but in practice we'll have trouble reading string values this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
-
 using namespace impala::io;
 
 using parquet::Encoding;
 
 namespace impala {
 
-const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
-    "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
-
 // Definition of variable declared in header for use of the
 // SHOULD_TRIGGER_COL_READER_DEBUG_ACTION macro.
 int parquet_column_reader_debug_count = 0;
@@ -268,20 +240,19 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// Pull out slow-path Status construction code
   void __attribute__((noinline)) SetDictDecodeError() {
     parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
+        slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset());
   }
 
   void __attribute__((noinline)) SetPlainDecodeError() {
     parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
+        slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset());
   }
 
   void __attribute__((noinline)) SetBoolDecodeError() {
     parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_BOOL_VALUE, filename(),
-        PrintThriftEnum(page_encoding_), stream_->file_offset());
+        PrintThriftEnum(page_encoding_), col_chunk_reader_.stream()->file_offset());
   }
 
-
   /// Dictionary decoder for decoding column values.
   DictDecoder<InternalType> dict_decoder_;
 
@@ -348,7 +319,7 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag
   DCHECK_GE(size, 0);
   DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
       << "Bool has specialized impl";
-  page_encoding_ = current_page_header_.data_page_header.encoding;
+  page_encoding_ = col_chunk_reader_.encoding();
   if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
       && page_encoding_ != parquet::Encoding::PLAIN) {
     return GetUnsupportedDecodingError();
@@ -381,7 +352,7 @@ Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataPage(
     uint8_t* data, int size) {
   // Data can be empty if the column contains all NULLs
   DCHECK_GE(size, 0);
-  page_encoding_ = current_page_header_.data_page_header.encoding;
+  page_encoding_ = col_chunk_reader_.encoding();
 
   /// Boolean decoding is delegated to 'bool_decoder_'.
   if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK();
@@ -1040,14 +1011,6 @@ bool ScalarColumnReader<DateValue, parquet::Type::INT32, true>::ValidateValue(
   return true;
 }
 
-// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
-// if this matches those versions and compatibility workarounds need to be used.
-static bool RequiresSkippedDictionaryHeaderCheck(
-    const ParquetFileVersion& v) {
-  if (v.application != "impala") return false;
-  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
 void BaseScalarColumnReader::CreateSubRanges(vector<ScanRange::SubRange>* sub_ranges) {
   sub_ranges->clear();
   if (!DoesPageFiltering()) return;
@@ -1078,10 +1041,9 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
       parent_->filename(), row_group_idx, col_idx(), schema_element(),
       parent_->state_));
   num_buffered_values_ = 0;
+
   data_ = nullptr;
   data_end_ = nullptr;
-  stream_ = nullptr;
-  io_reservation_ = 0;
   metadata_ = &col_chunk.meta_data;
   num_values_read_ = 0;
   def_level_ = ParquetLevel::INVALID_LEVEL;
@@ -1089,292 +1051,62 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
   rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL;
   pos_current_value_ = ParquetLevel::INVALID_POS;
 
-  if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-    RETURN_IF_ERROR(Codec::CreateDecompressor(
-        nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
-  }
-  int64_t col_start = col_chunk.meta_data.data_page_offset;
-  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-    // Already validated in ValidateColumnOffsets()
-    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
-    col_start = col_chunk.meta_data.dictionary_page_offset;
-  }
-  int64_t col_len = col_chunk.meta_data.total_compressed_size;
-  if (col_len <= 0) {
-    return Status(Substitute("File '$0' contains invalid column chunk size: $1",
-        filename(), col_len));
-  }
-  int64_t col_end = col_start + col_len;
-
-  // Already validated in ValidateColumnOffsets()
-  DCHECK_GT(col_end, 0);
-  DCHECK_LT(col_end, file_desc.file_length);
-  const ParquetFileVersion& file_version = parent_->file_version_;
-  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
-    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
-    // dictionary page header size in total_compressed_size and total_uncompressed_size
-    // (see IMPALA-694). We pad col_len to compensate.
-    int64_t bytes_remaining = file_desc.file_length - col_end;
-    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
-    col_len += pad;
-    col_end += pad;
-  }
-
-  // TODO: this will need to change when we have co-located files and the columns
-  // are different files.
-  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
-    return Status(Substitute("Expected parquet column file path '$0' to match "
-        "filename '$1'", col_chunk.file_path, filename()));
-  }
-
-  const ScanRange* metadata_range = parent_->metadata_range_;
-  int64_t partition_id = parent_->context_->partition_descriptor()->id();
-  const ScanRange* split_range =
-      static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
-  // Determine if the column is completely contained within a local split.
-  bool col_range_local = split_range->expected_local()
-      && col_start >= split_range->offset()
-      && col_end <= split_range->offset() + split_range->len();
   vector<ScanRange::SubRange> sub_ranges;
   CreateSubRanges(&sub_ranges);
-  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
-      filename(), col_len, col_start, move(sub_ranges), partition_id,
-      split_range->disk_id(),col_range_local, split_range->is_erasure_coded(),
-      file_desc.mtime, BufferOpts(split_range->cache_options()));
+
+  RETURN_IF_ERROR(col_chunk_reader_.InitColumnChunk(
+      file_desc, col_chunk, row_group_idx, move(sub_ranges)));
+
   ClearDictionaryDecoder();
   return Status::OK();
 }
 
 void BaseScalarColumnReader::Close(RowBatch* row_batch) {
-  if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) {
-    row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false);
-  } else {
-    data_page_pool_->FreeAll();
-  }
-  if (decompressor_ != nullptr) decompressor_->Close();
+  col_chunk_reader_.Close(row_batch == nullptr ? nullptr : row_batch->tuple_data_pool());
   DictDecoderBase* dict_decoder = GetDictionaryDecoder();
   if (dict_decoder != nullptr) dict_decoder->Close();
 }
 
-Status BaseScalarColumnReader::StartScan() {
-  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
-  ScannerContext* context = parent_->context_;
-  DCHECK_GT(io_reservation_, 0);
-  bool needs_buffers;
-  RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
-      scan_range_, &needs_buffers));
-  if (needs_buffers) {
-    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-        context->bp_client(), scan_range_, io_reservation_));
-  }
-  stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
-  DCHECK(stream_ != nullptr);
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadPageHeader(bool peek,
-    parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
-  DCHECK(stream_ != nullptr);
-  *eos = false;
-
-  uint8_t* buffer;
-  int64_t buffer_size;
-  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
-  // check for end of stream
-  if (buffer_size == 0) {
-    // The data pages contain fewer values than stated in the column metadata.
-    DCHECK(stream_->eosr());
-    DCHECK_LT(num_values_read_, metadata_->num_values);
-    // TODO for 2.3: node_.element->name isn't necessarily useful
-    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
-        num_values_read_, node_.element->name, filename());
-    RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
-    *eos = true;
-    return Status::OK();
-  }
-
-  // We don't know the actual header size until the thrift object is deserialized.  Loop
-  // until we successfully deserialize the header or exceed the maximum header size.
-  uint32_t header_size;
-  Status status;
-  while (true) {
-    header_size = buffer_size;
-    status = DeserializeThriftMsg(buffer, &header_size, true, next_page_header);
-    if (status.ok()) break;
-
-    if (buffer_size >= FLAGS_max_page_header_size) {
-      stringstream ss;
-      ss << "ParquetScanner: could not read data page because page header exceeded "
-         << "maximum size of "
-         << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
-      status.AddDetail(ss.str());
-      return status;
-    }
-
-    // Didn't read entire header, increase buffer size and try again
-    int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
-    status = Status::OK();
-    bool success = stream_->GetBytes(
-        new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
-    if (!success) {
-      DCHECK(!status.ok());
-      return status;
-    }
-    DCHECK(status.ok());
-
-    // Even though we increased the allowed buffer size, the number of bytes
-    // read did not change. The header is not limited by the buffer space,
-    // so it must be incomplete in the file.
-    if (buffer_size == new_buffer_size) {
-      DCHECK_NE(new_buffer_size, 0);
-      return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
-    }
-    DCHECK_GT(new_buffer_size, buffer_size);
-    buffer_size = new_buffer_size;
-  }
-
-  *next_header_size = header_size;
-
-  // Successfully deserialized current_page_header_
-  if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
-
-  int data_size = next_page_header->compressed_page_size;
-  if (UNLIKELY(data_size < 0)) {
-    return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
-        "column '$2'", filename(), data_size, schema_element().name));
-  }
-  int uncompressed_size = next_page_header->uncompressed_page_size;
-  if (UNLIKELY(uncompressed_size < 0)) {
-    return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
-        "size $1 for column '$2'", filename(), uncompressed_size,
-        schema_element().name));
-  }
-
-  return Status::OK();
-}
-
 Status BaseScalarColumnReader::InitDictionary() {
-  // Peek at the next page header
-  bool eos;
-  parquet::PageHeader next_page_header;
-  uint32_t next_header_size;
-  DCHECK(stream_ != nullptr);
-  DCHECK(!HasDictionaryDecoder());
-
-  RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
-                                 &next_header_size, &eos));
-  if (eos) return Status::OK();
-  // The dictionary must be the first data page, so if the first page
-  // is not a dictionary, then there is no dictionary.
-  if (Ubsan::EnumToInt(&next_page_header.type) != parquet::PageType::DICTIONARY_PAGE) {
-    return Status::OK();
-  }
-
-  current_page_header_ = next_page_header;
-  Status status;
-  if (!stream_->SkipBytes(next_header_size, &status)) return status;
-
-  int data_size = current_page_header_.compressed_page_size;
-  if (slot_desc_ == nullptr) {
-    // Skip processing the dictionary page if we don't need to decode any values. In
-    // addition to being unnecessary, we are likely unable to successfully decode the
-    // dictionary values because we don't necessarily create the right type of scalar
-    // reader if there's no slot to read into (see CreateReader()).
-    if (!stream_->SkipBytes(data_size, &status)) return status;
-    return Status::OK();
-  }
+  // Dictionary encoding is not supported for booleans.
+  const bool is_boolean = node_.element->type == parquet::Type::BOOLEAN;
+  const bool skip_data = slot_desc_ == nullptr || is_boolean;
 
-  if (node_.element->type == parquet::Type::BOOLEAN) {
+  // TODO: maybe avoid malloc on every page?
+  ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
+  bool eos;
+  bool is_dictionary_page;
+  int64_t data_size;
+  int num_entries;
+  RETURN_IF_ERROR(col_chunk_reader_.TryReadDictionaryPage(&is_dictionary_page, &eos,
+        skip_data, &uncompressed_buffer, &data_, &data_size, &num_entries));
+  if (eos) return HandleTooEarlyEos();
+  if (is_dictionary_page && is_boolean) {
     return Status("Unexpected dictionary page. Dictionary page is not"
-       " supported for booleans.");
+        " supported for booleans.");
   }
-
-  const parquet::DictionaryPageHeader* dict_header = nullptr;
-  if (current_page_header_.__isset.dictionary_page_header) {
-    dict_header = &current_page_header_.dictionary_page_header;
-  } else {
-    if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
-      return Status("Dictionary page does not have dictionary header set.");
-    }
-  }
-  if (dict_header != nullptr &&
-      dict_header->encoding != Encoding::PLAIN &&
-      dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
-    return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
-                  "for dictionary pages.");
-  }
-
-  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-  data_end_ = data_ + data_size;
+  if (!is_dictionary_page || skip_data) return Status::OK();
 
   // The size of dictionary can be 0, if every value is null. The dictionary still has to
   // be reset in this case.
   DictDecoderBase* dict_decoder;
-  if (current_page_header_.uncompressed_page_size == 0) {
+  if (data_size == 0) {
+    data_end_ = data_;
     return CreateDictionaryDecoder(nullptr, 0, &dict_decoder);
   }
-
-  // There are 3 different cases from the aspect of memory management:
-  // 1. If the column type is string, the dictionary will contain pointers to a buffer,
-  //    so the buffer's lifetime must be as long as any row batch that references it.
-  // 2. If the column type is not string, and the dictionary page is compressed, then a
-  //    temporary buffer is needed for the uncompressed values.
-  // 3. If the column type is not string, and the dictionary page is not compressed,
-  //    then no buffer is necessary.
-  ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
-  uint8_t* dict_values = nullptr;
-  if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) {
-    int buffer_size = current_page_header_.uncompressed_page_size;
-    if (slot_desc_->type().IsStringType()) {
-      dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
-    } else if (uncompressed_buffer.TryAllocate(buffer_size)) {
-      dict_values = uncompressed_buffer.buffer(); // case 2
-    }
-    if (UNLIKELY(dict_values == nullptr)) {
-      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary",
-          buffer_size, "dictionary");
-      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-               parent_->state_, details, buffer_size);
-    }
-  } else {
-    dict_values = data_; // case 3.
-  }
-
-  if (decompressor_.get() != nullptr) {
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-    const Status& status = decompressor_->ProcessBlock32(true, data_size, data_,
-        &uncompressed_size, &dict_values);
-    if (!status.ok()) {
-      return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
-               " data_page_offset $2: $3", filename(), node_.element->name,
-               metadata_->data_page_offset, status.GetDetail()));
-    }
-    VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
-    if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-      return Status(Substitute("Error decompressing dictionary page in file '$0'. "
-               "Expected $1 uncompressed bytes but got $2", filename(),
-               current_page_header_.uncompressed_page_size, uncompressed_size));
-    }
-  } else {
-    if (current_page_header_.uncompressed_page_size != data_size) {
-      return Status(Substitute("Error reading dictionary page in file '$0'. "
-                               "Expected $1 bytes but got $2", filename(),
-                               current_page_header_.uncompressed_page_size, data_size));
-    }
-    if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_, data_size);
+  // We cannot add data_size to data_ until we know it is not a nullptr.
+  if (data_ == nullptr) {
+    return Status("The dictionary values could not be read properly.");
   }
+  data_end_ = data_ + data_size;
 
-  RETURN_IF_ERROR(CreateDictionaryDecoder(
-      dict_values, current_page_header_.uncompressed_page_size, &dict_decoder));
-  if (dict_header != nullptr &&
-      dict_header->num_values != dict_decoder->num_entries()) {
+  RETURN_IF_ERROR(CreateDictionaryDecoder(data_, data_size, &dict_decoder));
+  if (num_entries != dict_decoder->num_entries()) {
     return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-                  slot_desc_->type().DebugString(),
-                  Substitute("Expected $0 entries but data contained $1 entries",
-                             dict_header->num_values, dict_decoder->num_entries()));
+        slot_desc_->type().DebugString(),
+        Substitute("Expected $0 entries but data contained $1 entries",
+          num_entries, dict_decoder->num_entries()));
   }
-
   return Status::OK();
 }
 
@@ -1387,176 +1119,55 @@ Status BaseScalarColumnReader::InitDictionaries(
 }
 
 Status BaseScalarColumnReader::ReadDataPage() {
-  // We're about to move to the next data page.  The previous data page is
+  // We're about to move to the next data page. The previous data page is
   // now complete, free up any memory allocated for it. If the data page contained
   // strings we need to attach it to the returned batch.
-  if (PageContainsTupleData(page_encoding_)) {
-    parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), false);
-  } else {
-    data_page_pool_->FreeAll();
-  }
-  // We don't hold any pointers to earlier pages in the stream - we can safely free
-  // any I/O or boundary buffer.
-  stream_->ReleaseCompletedResources(false);
-
-  // Read the next data page, skipping page types we don't care about.
-  // We break out of this loop on the non-error case (a data page was found or we read all
-  // the pages).
-  while (true) {
-    DCHECK_EQ(num_buffered_values_, 0);
-    if ((DoesPageFiltering() &&
-         candidate_page_idx_ == candidate_data_pages_.size() - 1) ||
-        num_values_read_ == metadata_->num_values) {
-      // No more pages to read
-      // TODO: should we check for stream_->eosr()?
-      break;
-    } else if (num_values_read_ > metadata_->num_values) {
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, filename());
-      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    bool eos;
-    uint32_t header_size;
-    RETURN_IF_ERROR(ReadPageHeader(false /* peek */, &current_page_header_,
-                                   &header_size, &eos));
-    if (eos) return Status::OK();
-
-    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
-      // Any dictionary is already initialized, as InitDictionary has already
-      // been called. There are two possibilities:
-      // 1. The parquet file has two dictionary pages
-      // OR
-      // 2. The parquet file does not have the dictionary as the first data page.
-      // Both are errors in the parquet file.
-      if (HasDictionaryDecoder()) {
-        return Status(Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
-            "for column '$1'", filename(), schema_element().name));
-      } else {
-        return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
-            "column '$1' is not the first page", filename(), schema_element().name));
-      }
-    }
-
-    Status status;
-    int data_size = current_page_header_.compressed_page_size;
-    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
-      // We can safely skip non-data pages
-      if (!stream_->SkipBytes(data_size, &status)) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      continue;
-    }
+  col_chunk_reader_.ReleaseResourcesOfLastPage(parent_->scratch_batch_->aux_mem_pool);
+
+  DCHECK_EQ(num_buffered_values_, 0);
+  if ((DoesPageFiltering() &&
+        candidate_page_idx_ == candidate_data_pages_.size() - 1) ||
+      num_values_read_ == metadata_->num_values) {
+    // No more pages to read
+    // TODO: should we check for stream_->eosr()?
+    return Status::OK();
+  } else if (num_values_read_ > metadata_->num_values) {
+    RETURN_IF_ERROR(LogCorruptNumValuesInMetadataError());
+    return Status::OK();
+  }
 
-    // Read Data Page
-    // TODO: when we start using page statistics, we will need to ignore certain corrupt
-    // statistics. See IMPALA-2208 and PARQUET-251.
-    if (!stream_->ReadBytes(data_size, &data_, &status)) {
-      DCHECK(!status.ok());
-      return status;
-    }
-    data_end_ = data_ + data_size;
-    int num_values = current_page_header_.data_page_header.num_values;
-    if (num_values < 0) {
-      return Status(Substitute("Error reading data page in Parquet file '$0'. "
+  bool eos;
+  int data_size;
+  RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size));
+  if (eos) return HandleTooEarlyEos();
+  data_end_ = data_ + data_size;
+  const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
+  int num_values = current_page_header.data_page_header.num_values;
+  if (num_values < 0) {
+    return Status(Substitute("Error reading data page in Parquet file '$0'. "
           "Invalid number of values in metadata: $1", filename(), num_values));
-    }
-
-    num_buffered_values_ = num_values;
-    num_values_read_ += num_buffered_values_;
-
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-    if (decompressor_.get() != nullptr) {
-      SCOPED_TIMER(parent_->decompress_timer_);
-      uint8_t* decompressed_buffer;
-      RETURN_IF_ERROR(AllocateUncompressedDataPage(
-            uncompressed_size, "decompressed data", &decompressed_buffer));
-      const Status& status = decompressor_->ProcessBlock32(true,
-          current_page_header_.compressed_page_size, data_, &uncompressed_size,
-          &decompressed_buffer);
-      if (!status.ok()) {
-        return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
-                 " data_page_offset $2: $3", filename(), node_.element->name,
-                 metadata_->data_page_offset, status.GetDetail()));
-      }
-
-      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
-                << " to " << uncompressed_size;
-      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error decompressing data page in file '$0'. "
-            "Expected $1 uncompressed bytes but got $2", filename(),
-            current_page_header_.uncompressed_page_size, uncompressed_size));
-      }
-      data_ = decompressed_buffer;
-      data_size = current_page_header_.uncompressed_page_size;
-      data_end_ = data_ + data_size;
-      if (slot_desc() != nullptr) {
-        parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size,
-            current_page_header_.compressed_page_size);
-        parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
-        parent_->UpdateCompressedPageSizeCounter(
-            current_page_header_.compressed_page_size);
-      }
-    } else {
-      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
-      if (current_page_header_.compressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error reading data page in file '$0'. "
-            "Expected $1 bytes but got $2", filename(),
-            current_page_header_.compressed_page_size, uncompressed_size));
-      }
-      if (PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
-        // In this case returned batches will have pointers into the data page itself.
-        // We don't transfer disk I/O buffers out of the scanner so we need to copy
-        // the page data so that it can be attached to output batches.
-        uint8_t* copy_buffer;
-        RETURN_IF_ERROR(AllocateUncompressedDataPage(
-              uncompressed_size, "uncompressed variable-length data", &copy_buffer));
-        memcpy(copy_buffer, data_, uncompressed_size);
-        data_ = copy_buffer;
-        data_end_ = data_ + uncompressed_size;
-      }
-      if (slot_desc() != nullptr) {
-        parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size, 0);
-        parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
-      }
-    }
+  }
+  num_buffered_values_ = num_values;
+  num_values_read_ += num_buffered_values_;
 
-    // Initialize the repetition level data
-    RETURN_IF_ERROR(rep_levels_.Init(filename(),
-        &current_page_header_.data_page_header.repetition_level_encoding,
+  /// TODO: Move the level decoder initialisation to ParquetPageReader to abstract away
+  /// the differences between Parquet header V1 and V2.
+  // Initialize the repetition level data
+  RETURN_IF_ERROR(rep_levels_.Init(filename(),
+        &current_page_header.data_page_header.repetition_level_encoding,
         parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
         &data_size));
-
-    // Initialize the definition level data
-    RETURN_IF_ERROR(def_levels_.Init(filename(),
-        &current_page_header_.data_page_header.definition_level_encoding,
+  // Initialize the definition level data
+  RETURN_IF_ERROR(def_levels_.Init(filename(),
+        &current_page_header.data_page_header.definition_level_encoding,
         parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
         &data_size));
+  // Data can be empty if the column contains all NULLs
+  RETURN_IF_ERROR(InitDataPage(data_, data_size));
+  // Skip rows if needed.
+  RETURN_IF_ERROR(StartPageFiltering());
 
-    // Data can be empty if the column contains all NULLs
-    RETURN_IF_ERROR(InitDataPage(data_, data_size));
-
-    // Skip rows if needed.
-    RETURN_IF_ERROR(StartPageFiltering());
-
-    if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
-    break;
-  }
-
-  return Status::OK();
-}
-
-Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
-    const char* err_ctx, uint8_t** buffer) {
-  *buffer = data_page_pool_->TryAllocate(size);
-  if (*buffer == nullptr) {
-    string details =
-        Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
-    return data_page_pool_->mem_tracker()->MemLimitExceeded(
-        parent_->state_, details, size);
-  }
+  if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
   return Status::OK();
 }
 
@@ -1767,6 +1378,19 @@ Status BaseScalarColumnReader::GetUnsupportedDecodingError() {
       filename(), PrintThriftEnum(page_encoding_), schema_element().name));
 }
 
+Status BaseScalarColumnReader::LogCorruptNumValuesInMetadataError() {
+  ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+      metadata_->num_values, num_values_read_, node_.element->name, filename());
+  return parent_->state_->LogOrReturnError(msg);
+}
+
+Status BaseScalarColumnReader::HandleTooEarlyEos() {
+  // The data pages contain fewer values than stated in the column metadata.
+  DCHECK(col_chunk_reader_.stream()->eosr());
+  DCHECK_LT(num_values_read_, metadata_->num_values);
+  return LogCorruptNumValuesInMetadataError();
+}
+
 bool BaseScalarColumnReader::NextPage() {
   parent_->assemble_rows_timer_.Stop();
   parent_->parse_status_ = ReadDataPage();
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 6571379..1d035bf 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -22,9 +22,7 @@
 
 #include "exec/parquet/hdfs-parquet-scanner.h"
 #include "exec/parquet/parquet-level-decoder.h"
-#include "runtime/io/request-ranges.h"
-#include "util/bit-stream-utils.h"
-#include "util/codec.h"
+#include "exec/parquet/parquet-column-chunk-reader.h"
 
 namespace impala {
 
@@ -244,7 +242,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : ParquetColumnReader(parent, node, slot_desc),
-      data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+      col_chunk_reader_(parent, node.element->name,
+        slot_desc != nullptr ? slot_desc->id() : -1, PageReaderValueMemoryType()) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
@@ -264,7 +263,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// before any of the column data can be read (including dictionary and
   /// data pages). Returns an error status if there was an error starting the
   /// scan or allocating buffers for it.
-  Status StartScan();
+  Status StartScan() { return col_chunk_reader_.StartScan(); }
 
   /// Helper to start scans for multiple columns at once.
   static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
@@ -274,14 +273,14 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
   virtual void Close(RowBatch* row_batch);
 
-  io::ScanRange* scan_range() const { return scan_range_; }
+  io::ScanRange* scan_range() const { return col_chunk_reader_.scan_range(); }
   int64_t total_len() const { return metadata_->total_compressed_size; }
   int col_idx() const { return node_.col_idx; }
   THdfsCompression::type codec() const {
     if (metadata_ == NULL) return THdfsCompression::NONE;
     return ConvertParquetToImpalaCodec(metadata_->codec);
   }
-  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
+  void set_io_reservation(int bytes) { col_chunk_reader_.set_io_reservation(bytes); }
 
   /// Reads the next definition and repetition levels for this column. Initializes the
   /// next data page if necessary.
@@ -316,6 +315,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
   friend class HdfsParquetScanner;
 
+  ParquetColumnChunkReader col_chunk_reader_;
+
   // Class members that are accessed for every column should be included up here so they
   // fit in as few cache lines as possible.
 
@@ -348,26 +349,6 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Metadata for the column for the current row group.
   const parquet::ColumnMetaData* metadata_ = nullptr;
 
-  boost::scoped_ptr<Codec> decompressor_;
-
-  /// The scan range for the column's data. Initialized for each row group by Reset().
-  io::ScanRange* scan_range_ = nullptr;
-
-  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
-  ScannerContext::Stream* stream_ = nullptr;
-
-  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
-  /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
-  /// by Reset().
-  int64_t io_reservation_ = 0;
-
-  /// Pool to allocate storage for data pages from - either decompression buffers for
-  /// compressed data pages or copies of the data page with var-len data to attach to
-  /// batches.
-  boost::scoped_ptr<MemPool> data_page_pool_;
-
-  /// Header for current data page.
-  parquet::PageHeader current_page_header_;
 
   /////////////////////////////////////////
   /// BEGIN: Members used for page filtering
@@ -453,19 +434,19 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// 'size' bytes remaining.
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
-  /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
-  /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success,
-  /// 'buffer' points to the allocated memory. Otherwise an error status is returned.
-  Status AllocateUncompressedDataPage(
-      int64_t size, const char* err_ctx, uint8_t** buffer);
-
-  /// Returns true if a data page for this column with the specified 'encoding' may
-  /// contain strings referenced by returned batches. Cases where this is not true are:
-  /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'.
-  /// * Fixed-length slots, where there is no string data.
-  bool PageContainsTupleData(parquet::Encoding::type page_encoding) {
-    return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
-        && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
+  ParquetColumnChunkReader::ValueMemoryType PageReaderValueMemoryType() {
+    if (slot_desc_ == nullptr) {
+      return ParquetColumnChunkReader::ValueMemoryType::NO_SLOT_DESC;
+    }
+
+    const ColumnType& col_type = slot_desc_->type();
+    if (col_type.IsStringType()) {
+      if (col_type.IsVarLenStringType()) {
+        return ParquetColumnChunkReader::ValueMemoryType::VAR_LEN_STR;
+      }
+      return ParquetColumnChunkReader::ValueMemoryType::FIXED_LEN_STR;
+    }
+    return ParquetColumnChunkReader::ValueMemoryType::SCALAR;
   }
 
   /// Resets structures related to page filtering.
@@ -561,6 +542,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
   // Returns a detailed error message about unsupported encoding.
   Status GetUnsupportedDecodingError();
+
+  Status LogCorruptNumValuesInMetadataError();
+  Status HandleTooEarlyEos();
 };
 
 // Inline to allow inlining into collection and scalar column reader.
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
new file mode 100644
index 0000000..7765feb
--- /dev/null
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-page-reader.h"
+
+#include <sstream>
+#include <string>
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "common/names.h"
+#include "exec/scanner-context.inline.h"
+#include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "util/pretty-printer.h"
+
+
+// Max data page header size in bytes. This is an estimate and only needs to be an upper
+// bound. It is theoretically possible to have a page header of any size due to string
+// value statistics, but in practice we'll have trouble reading string values this large.
+// Also, this limit is in place to prevent impala from reading corrupt parquet files.
+DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
+
+using namespace impala::io;
+
+using parquet::Encoding;
+
+namespace impala {
+
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
+Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
+    const parquet::ColumnChunk& col_chunk, int row_group_idx,
+    std::vector<io::ScanRange::SubRange>&& sub_ranges) {
+  int64_t col_start = col_chunk.meta_data.data_page_offset;
+  if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+    // Already validated in ValidateColumnOffsets()
+    DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+    col_start = col_chunk.meta_data.dictionary_page_offset;
+  }
+
+  int64_t col_len = col_chunk.meta_data.total_compressed_size;
+  if (col_len <= 0) {
+    return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+          filename(), col_len));
+  }
+
+  int64_t col_end = col_start + col_len;
+  // Already validated in ValidateColumnOffsets()
+  DCHECK_GT(col_end, 0);
+  DCHECK_LT(col_end, file_desc.file_length);
+  const ParquetFileVersion& file_version = parent_->file_version_;
+  if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+    // dictionary page header size in total_compressed_size and total_uncompressed_size
+    // (see IMPALA-694). We pad col_len to compensate.
+    int64_t bytes_remaining = file_desc.file_length - col_end;
+    int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+    col_len += pad;
+    col_end += pad;
+  }
+
+  if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+    return Status(Substitute("Expected parquet column file path '$0' to match "
+          "filename '$1'", col_chunk.file_path, filename()));
+  }
+
+  const ScanRange* metadata_range = parent_->metadata_range_;
+  int64_t partition_id = parent_->context_->partition_descriptor()->id();
+  const ScanRange* split_range =
+      static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  // Determine if the column is completely contained within a local split.
+  bool col_range_local = split_range->expected_local()
+      && col_start >= split_range->offset()
+      && col_end <= split_range->offset() + split_range->len();
+  scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+      filename(), col_len, col_start, move(sub_ranges),
+      partition_id, split_range->disk_id(),
+      col_range_local, split_range->is_erasure_coded(), file_desc.mtime,
+      BufferOpts(split_range->cache_options()));
+  page_headers_read_ = 0;
+  dictionary_header_encountered_ = false;
+  state_ = State::Initialized;
+  return Status::OK();
+}
+
+Status ParquetPageReader::StartScan(int io_reservation) {
+  DCHECK_EQ(state_, State::Initialized);
+  DCHECK_GT(io_reservation, 0);
+  DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+  ScannerContext* context = parent_->context_;
+  bool needs_buffers;
+  RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
+        scan_range_, &needs_buffers));
+  if (needs_buffers) {
+    RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+          context->bp_client(), scan_range_, io_reservation));
+  }
+  stream_ = parent_->context_->AddStream(scan_range_, io_reservation);
+  DCHECK(stream_ != nullptr);
+
+  state_ = State::ToReadHeader;
+  return Status::OK();
+}
+
+Status ParquetPageReader::ReadPageHeader(bool* eos) {
+  DCHECK(state_ == State::ToReadHeader || state_ == State::ToReadData);
+  DCHECK(stream_ != nullptr);
+
+  *eos = false;
+  if (state_ == State::ToReadData) return Status::OK();
+
+  uint8_t* buffer;
+  int64_t buffer_size;
+  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+  // Check for end of stream.
+  if (buffer_size == 0) {
+    DCHECK(stream_->eosr());
+    *eos = true;
+    return Status::OK();
+  }
+  // We don't know the actual header size until the thrift object is deserialized. Loop
+  // until we successfully deserialize the header or exceed the maximum header size.
+  uint32_t header_size;
+  Status status;
+  parquet::PageHeader header;
+  while (true) {
+    header_size = buffer_size;
+    status = DeserializeThriftMsg(buffer, &header_size, true, &header);
+    if (status.ok()) break;
+
+    if (buffer_size >= FLAGS_max_page_header_size) {
+      stringstream ss;
+      ss << "ParquetScanner: could not read data page because page header exceeded "
+         << "maximum size of "
+         << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+      status.AddDetail(ss.str());
+      return status;
+    }
+    // Didn't read entire header, increase buffer size and try again
+    int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+    status = Status::OK();
+    bool success = stream_->GetBytes(
+        new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
+    if (!success) {
+      DCHECK(!status.ok());
+      return status;
+    }
+    DCHECK(status.ok());
+    // Even though we increased the allowed buffer size, the number of bytes
+    // read did not change. The header is not limited by the buffer space,
+    // so it must be incomplete in the file.
+    if (buffer_size == new_buffer_size) {
+      DCHECK_NE(new_buffer_size, 0);
+      return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+    }
+    DCHECK_GT(new_buffer_size, buffer_size);
+    buffer_size = new_buffer_size;
+  }
+  int data_size = header.compressed_page_size;
+  if (UNLIKELY(data_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+        "column '$2'", filename(), data_size, schema_name_));
+  }
+  int uncompressed_size = header.uncompressed_page_size;
+  if (UNLIKELY(uncompressed_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+        "size $1 for column '$2'", filename(), uncompressed_size,
+        schema_name_));
+  }
+  const bool is_dictionary = header.__isset.dictionary_page_header;
+  if (UNLIKELY(page_headers_read_ != 0 && is_dictionary)) {
+    // Any dictionary is already initialized as it has to be the first page.
+    // There are two possibilities:
+    // 1. The parquet file has two dictionary pages
+    // OR
+    // 2. The parquet file does not have the dictionary as the first data page.
+    // Both are errors in the parquet file.
+    if (dictionary_header_encountered_) {
+      return Status( Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
+            "for column '$1'", filename(), schema_name_));
+    } else {
+      return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
+                "column '$1' is not the first page", filename(), schema_name_));
+    }
+  }
+  RETURN_IF_ERROR(AdvanceStream(header_size));
+  current_page_header_ = header;
+  header_initialized_ = true;
+  page_headers_read_++;
+  dictionary_header_encountered_ = dictionary_header_encountered_ || is_dictionary;
+  state_ = State::ToReadData;
+  return Status::OK();
+}
+
+Status ParquetPageReader::ReadPageData(uint8_t** data) {
+  DCHECK_EQ(state_, State::ToReadData);
+  Status status;
+  if (!stream_->ReadBytes(current_page_header_.compressed_page_size, data, &status)) {
+    DCHECK(!status.ok());
+    return status;
+  }
+  state_ = State::ToReadHeader;
+  return Status::OK();
+}
+
+Status ParquetPageReader::SkipPageData() {
+  DCHECK_EQ(state_, State::ToReadData);
+  RETURN_IF_ERROR(AdvanceStream(current_page_header_.compressed_page_size));
+  state_ = State::ToReadHeader;
+  return Status::OK();
+}
+
+Status ParquetPageReader::AdvanceStream(int64_t bytes) {
+  Status status;
+  if (!stream_->SkipBytes(bytes, &status)) return status;
+  return Status::OK();
+}
+
+std::ostream& operator<<(std::ostream& out, const ParquetPageReader::State state) {
+  switch (state) {
+    case ParquetPageReader::State::Uninitialized: out << "Uninitialized"; break;
+    case ParquetPageReader::State::Initialized: out << "Initialized"; break;
+    case ParquetPageReader::State::ToReadHeader: out << "ToReadHeader"; break;
+    case ParquetPageReader::State::ToReadData: out << "ToReadData"; break;
+  }
+
+  return out;
+}
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-page-reader.h b/be/src/exec/parquet/parquet-page-reader.h
new file mode 100644
index 0000000..b98e94a
--- /dev/null
+++ b/be/src/exec/parquet/parquet-page-reader.h
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+
+namespace impala {
+
+/// A class used to read Parquet page headers and data pages. Memory management is not
+/// handled by this class.  Before reading, InitColumnChunk() followed by StartScan() must
+/// be called.
+class ParquetPageReader {
+ public:
+  const char* filename() const { return parent_->filename(); }
+  io::ScanRange* scan_range() const { return scan_range_; }
+  ScannerContext::Stream* stream() const { return stream_; }
+  uint64_t PageHeadersRead() const { return page_headers_read_; }
+
+  ParquetPageReader(HdfsParquetScanner* parent, string schema_name)
+      : parent_(parent),
+        schema_name_(schema_name)
+  {
+  }
+
+  /// Resets the reader for each row group in the file and creates the scan range for the
+  /// column, but does not start it. To start scanning, call StartScan().
+  Status InitColumnChunk(const HdfsFileDesc& file_desc,
+      const parquet::ColumnChunk& col_chunk, int row_group_idx,
+      std::vector<io::ScanRange::SubRange>&& sub_ranges);
+
+  /// Starts the column scan range. InitColumnChunk() needs to be called before this. This
+  /// method must be called before any of the column data can be read (including
+  /// dictionary and data pages). 'io_reservation' is the amount of reservation assigned
+  /// to the column. Returns an error status if there was an error starting the scan or
+  /// allocating buffers for it.
+  Status StartScan(int io_reservation);
+
+  /// Reads the next page header if the data belonging to the current header has been read
+  /// or skipped, otherwise does nothing. If the stream reaches the end before reading a
+  /// complete page header, '*eos' is set to true.
+  Status ReadPageHeader(bool* eos);
+
+  /// Reads the next data page to 'data'. The header must already be read. It is invalid
+  /// to call this or SkipPageData() again without reading the next header.
+  Status ReadPageData(uint8_t** data);
+
+  /// Skips the next data page. The header must already be read. It is invalid to call
+  /// this or ReadPageData() again without reading the next header.
+  Status SkipPageData();
+
+  const parquet::PageHeader& CurrentPageHeader() const {
+    DCHECK(header_initialized_);
+    return current_page_header_;
+  }
+
+ private:
+  Status AdvanceStream(int64_t bytes);
+
+  HdfsParquetScanner* parent_;
+  string schema_name_;
+
+  /// Header for current data page.
+  parquet::PageHeader current_page_header_;
+  bool header_initialized_ = false;
+
+  /// The scan range for the column's data. Initialized for each row group by Reset().
+  io::ScanRange* scan_range_ = nullptr;
+
+  /// Stream used to read data from 'scan_range_'. Initialized by StartScan().
+  ScannerContext::Stream* stream_ = nullptr;
+
+  uint64_t page_headers_read_ = 0;
+  bool dictionary_header_encountered_ = false;
+
+  /// We maintain a state machine for debug purposes. The state transitions are
+  /// the following:
+  ///
+  ///                              Uninitialized
+  ///                                    +
+  ///                                    |
+  ///                                    |  (InitColumnChunk)
+  ///                                    |
+  ///                                    v
+  ///                   +-------->  Initialized
+  ///                   |                +
+  ///                   |                |
+  ///                   |                |     (StartScan)
+  ///                   |                |
+  ///                   |                v
+  /// (InitColumnChunk) +--------+ ToReadHeader <--------------+
+  ///                   |                +                     |
+  ///                   |                |                     |
+  ///                   |                |   (ReadPageHeader)  | (ReadPageData,
+  ///                   |                |                     |  SkipPageData)
+  ///                   |                v                     |
+  ///                   +--------+  ToReadData  +--------------+
+  ///
+  enum class State {
+    Uninitialized,
+    Initialized,
+    ToReadHeader,
+    ToReadData,
+  };
+
+  friend
+  std::ostream& operator<<(std::ostream& out, const ParquetPageReader::State state);
+
+  State state_ = State::Uninitialized;
+};
+
+} // namespace impala