You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/15 18:27:19 UTC

[1/5] incubator-impala git commit: IMPALA-3678: Fix migration of predicates into union operands with an order by + limit.

Repository: incubator-impala
Updated Branches:
  refs/heads/master baf8fe202 -> 45740c8bc


IMPALA-3678: Fix migration of predicates into union operands with an order by + limit.

There were two separate issues:

First, the SortNode incorrectly picked up unassigned conjuncts, and expected those to
be empty. In this case where predicates are migrated into union operands, there could
actually be unassigned conjuncts bound by the SortNode's tuple id (and so would be
incorrectly picked up). The fix is to not pick up unassigned conjuncts in the SortNode,
and allow them to be picked up later (into a SelectNode).

Second, when generating the plan for union operands we were missing a call to
graft a SelectNode on top of the operand plan to capture unassigned conjuncts.

Change-Id: I95d105ac15a3dc975e52dfd418890e13f912dfce
Reviewed-on: http://gerrit.cloudera.org:8080/3600
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/45740c8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/45740c8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/45740c8b

Branch: refs/heads/master
Commit: 45740c8bcc05fb5075dd663c81de1cfc69ee5f1b
Parents: 6ee15fa
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Jul 8 14:24:04 2016 -0700
Committer: Taras Bobrovytsky <ta...@apache.org>
Committed: Fri Jul 15 18:27:05 2016 +0000

----------------------------------------------------------------------
 .../impala/planner/SingleNodePlanner.java       |  3 +
 .../com/cloudera/impala/planner/SortNode.java   |  3 +-
 .../queries/PlannerTest/union.test              | 84 +++++++++++++++++++-
 3 files changed, 88 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45740c8b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
index 9f07edb..84552e0 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
@@ -1595,6 +1595,9 @@ public class SingleNodePlanner {
         }
       }
       PlanNode opPlan = createQueryPlan(queryStmt, op.getAnalyzer(), false);
+      // There may still be unassigned conjuncts if the operand has an order by + limit.
+      // Place them into a SelectNode on top of the operand's plan.
+      opPlan = addUnassignedConjuncts(analyzer, opPlan.getTupleIds(), opPlan);
       if (opPlan instanceof EmptySetNode) continue;
       unionNode.addChild(opPlan, op.getQueryStmt().getBaseTblResultExprs());
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45740c8b/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SortNode.java b/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
index cd5f58c..6cc0077 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
@@ -89,7 +89,8 @@ public class SortNode extends PlanNode {
 
   @Override
   public void init(Analyzer analyzer) throws InternalException {
-    assignConjuncts(analyzer);
+    // Do not assignConjuncts() here, so that conjuncts bound by this SortNode's tuple id
+    // can be placed in a downstream SelectNode. A SortNode cannot evaluate conjuncts.
     Preconditions.checkState(conjuncts_.isEmpty());
     // Compute the memory layout for the generated tuple.
     computeMemLayout(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45740c8b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/union.test b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
index 86c59c7..1dfbdcc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/union.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/union.test
@@ -2688,4 +2688,86 @@ select l_orderkey from tpch.lineitem UNION DISTINCT (select l_orderkey from tpch
 |
 01:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
-====
\ No newline at end of file
+====
+# IMPALA-3678: Predicates migrated into a union operand should be placed into
+# a SelectNode if that union operand has an order by + limit.
+select * from
+  ((select * from functional.alltypes)
+    union all
+   (select * from functional.alltypes order by id)
+    union all
+   (select * from functional.alltypessmall order by id limit 10)
+    union all
+   (select * from functional.alltypestiny order by id limit 20 offset 10)) v
+where v.id < 10 and v.int_col > 20
+---- PLAN
+00:UNION
+|
+|--08:SELECT
+|  |  predicates: id < 10, int_col > 20
+|  |
+|  07:TOP-N [LIMIT=20 OFFSET=10]
+|  |  order by: id ASC
+|  |
+|  06:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+|--05:SELECT
+|  |  predicates: id < 10, int_col > 20
+|  |
+|  04:TOP-N [LIMIT=10]
+|  |  order by: id ASC
+|  |
+|  03:SCAN HDFS [functional.alltypessmall]
+|     partitions=4/4 files=4 size=6.32KB
+|
+|--02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: functional.alltypes.id < 10, functional.alltypes.int_col > 20
+|
+01:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: functional.alltypes.id < 10, functional.alltypes.int_col > 20
+====
+# IMPALA-3678: Same as above but with union distinct.
+select * from
+  ((select * from functional.alltypes)
+    union distinct
+   (select * from functional.alltypes order by id)
+    union distinct
+   (select * from functional.alltypessmall order by id limit 10)
+    union distinct
+   (select * from functional.alltypestiny order by id limit 20 offset 10)) v
+where v.id < 10 and v.int_col > 20
+---- PLAN
+09:AGGREGATE [FINALIZE]
+|  group by: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|
+00:UNION
+|
+|--08:SELECT
+|  |  predicates: id < 10, int_col > 20
+|  |
+|  07:TOP-N [LIMIT=20 OFFSET=10]
+|  |  order by: id ASC
+|  |
+|  06:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+|--05:SELECT
+|  |  predicates: id < 10, int_col > 20
+|  |
+|  04:TOP-N [LIMIT=10]
+|  |  order by: id ASC
+|  |
+|  03:SCAN HDFS [functional.alltypessmall]
+|     partitions=4/4 files=4 size=6.32KB
+|
+|--02:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|     predicates: functional.alltypes.id < 10, functional.alltypes.int_col > 20
+|
+01:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: functional.alltypes.id < 10, functional.alltypes.int_col > 20
+====


[4/5] incubator-impala git commit: IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index fa64f84..1ebb650 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,17 +17,15 @@
 #include <limits> // for std::numeric_limits
 #include <queue>
 
-#include <boost/algorithm/string.hpp>
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
-#include "common/object-pool.h"
 #include "common/logging.h"
+#include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"
+#include "exec/parquet-column-readers.h"
 #include "exec/scanner-context.inline.h"
-#include "exec/read-write-util.h"
 #include "exprs/expr.h"
-#include "gutil/bits.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
@@ -37,47 +35,18 @@
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/string-value.h"
-#include "util/bitmap.h"
-#include "util/bit-util.h"
-#include "util/decompress.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
-#include "util/dict-encoding.h"
-#include "util/rle-encoding.h"
-#include "util/runtime-profile-counters.h"
 #include "rpc/thrift-util.h"
 
 #include "common/names.h"
 
-using boost::algorithm::is_any_of;
-using boost::algorithm::split;
-using boost::algorithm::token_compress_on;
+using strings::Substitute;
 using namespace impala;
-using namespace strings;
-
-// Provide a workaround for IMPALA-1658.
-DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
-    "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
-    "be converted from UTC to local time. Writes are unaffected.");
 
 DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
     "rows rejected by a runtime filter drops below this value, the filter is disabled.");
 
-const int64_t HdfsParquetScanner::FOOTER_SIZE = 100 * 1024;
-const int16_t HdfsParquetScanner::ROW_GROUP_END = numeric_limits<int16_t>::min();
-const int16_t HdfsParquetScanner::INVALID_LEVEL = -1;
-const int16_t HdfsParquetScanner::INVALID_POS = -1;
-
-// Max data page header size in bytes. This is an estimate and only needs to be an upper
-// bound. It is theoretically possible to have a page header of any size due to string
-// value statistics, but in practice we'll have trouble reading string values this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
-
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-const int MAX_DICT_HEADER_SIZE = 100;
-
 // The number of rows between checks to see if a filter is not effective, and should be
 // disabled. Must be a power of two.
 const int ROWS_PER_FILTER_SELECTIVITY_CHECK = 16 * 1024;
@@ -85,19 +54,14 @@ static_assert(
     !(ROWS_PER_FILTER_SELECTIVITY_CHECK & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 1)),
     "ROWS_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
 
-// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
-// input file. They differentiate these cases from DCHECKs, which indicate conditions that
-// are true unless there's a bug in Impala. We would ideally always return a bad Status
-// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK instead because
-// there's a performance cost to doing the check in a release build, or just due to legacy
-// code.
-#define FILE_CHECK(a) DCHECK(a)
-#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b)
-#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b)
-#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b)
-#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b)
-#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b)
-#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b)
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+const int MAX_DICT_HEADER_SIZE = 100;
+
+const int64_t HdfsParquetScanner::FOOTER_SIZE;
+const int16_t HdfsParquetScanner::ROW_GROUP_END;
+const int16_t HdfsParquetScanner::INVALID_LEVEL;
+const int16_t HdfsParquetScanner::INVALID_POS;
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
     const std::vector<HdfsFileDesc*>& files) {
@@ -168,967 +132,18 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 
 namespace impala {
 
-/// Helper struct that holds a batch of tuples allocated from a mem pool, as well
-/// as state associated with iterating over its tuples and transferring
-/// them to an output batch in TransferScratchTuples().
-struct ScratchTupleBatch {
-  // Memory backing the batch of tuples. Allocated from batch's tuple data pool.
-  uint8_t* tuple_mem;
-  // Keeps track of the current tuple index.
-  int tuple_idx;
-  // Number of valid tuples in tuple_mem.
-  int num_tuples;
-  // Cached for convenient access.
-  const int tuple_byte_size;
-
-  // Helper batch for safely allocating tuple_mem from its tuple data pool using
-  // ResizeAndAllocateTupleBuffer().
-  RowBatch batch;
-
-  ScratchTupleBatch(
-      const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
-    : tuple_mem(NULL),
-      tuple_idx(0),
-      num_tuples(0),
-      tuple_byte_size(row_desc.GetRowSize()),
-      batch(row_desc, batch_size, mem_tracker) {
-    DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
-  }
-
-  Status Reset(RuntimeState* state) {
-    tuple_idx = 0;
-    num_tuples = 0;
-    // Buffer size is not needed.
-    int64_t buffer_size;
-    RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem));
-    return Status::OK();
-  }
-
-  inline Tuple* GetTuple(int tuple_idx) const {
-    return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
-  }
-
-  inline MemPool* mem_pool() { return batch.tuple_data_pool(); }
-  inline int capacity() const { return batch.capacity(); }
-  inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
-  inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
-  inline bool AtEnd() const { return tuple_idx == num_tuples; }
-};
-
-const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
-    "$1 bytes for $2.";
-
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state)
     : HdfsScanner(scan_node, state),
       scratch_batch_(new ScratchTupleBatch(
           scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
       metadata_range_(NULL),
       dictionary_pool_(new MemPool(scan_node->mem_tracker())),
-      assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
+      assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+      num_cols_counter_(NULL),
+      num_row_groups_counter_(NULL) {
   assemble_rows_timer_.Stop();
 }
 
-HdfsParquetScanner::~HdfsParquetScanner() {
-}
-
-// TODO for 2.3: move column readers to separate file
-
-/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and
-/// caches level values in batches.
-/// Level values are unsigned 8-bit integers because we support a maximum nesting
-/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
-/// populating the level cache (e.g., with RLE we can memset() repeated values).
-///
-/// Inherits from RleDecoder instead of containing one for performance reasons.
-/// The containment design would require two BitReaders per column reader. The extra
-/// BitReader causes enough bloat for a column reader to require another cache line.
-/// TODO: It is not clear whether the inheritance vs. containment choice still makes
-/// sense with column-wise materialization. The containment design seems cleaner and
-/// we should revisit.
-class HdfsParquetScanner::LevelDecoder : public RleDecoder {
- public:
-  LevelDecoder(bool is_def_level_decoder)
-    : cached_levels_(NULL),
-      num_cached_levels_(0),
-      cached_level_idx_(0),
-      encoding_(parquet::Encoding::PLAIN),
-      max_level_(0),
-      cache_size_(0),
-      num_buffered_values_(0),
-      decoding_error_code_(is_def_level_decoder ?
-          TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) {
-  }
-
-  /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
-  /// encoding requires reading metadata from the page header.
-  Status Init(const string& filename, parquet::Encoding::type encoding,
-      MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
-      uint8_t** data, int* data_size);
-
-  /// Returns the next level or INVALID_LEVEL if there was an error.
-  inline int16_t ReadLevel();
-
-  /// Decodes and caches the next batch of levels. Resets members associated with the
-  /// cache. Returns a non-ok status if there was a problem decoding a level, or if a
-  /// level was encountered with a value greater than max_level_.
-  Status CacheNextBatch(int batch_size);
-
-  /// Functions for working with the level cache.
-  inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
-  inline uint8_t CacheGetNext() {
-    DCHECK_LT(cached_level_idx_, num_cached_levels_);
-    return cached_levels_[cached_level_idx_++];
-  }
-  inline void CacheSkipLevels(int num_levels) {
-    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
-    cached_level_idx_ += num_levels;
-  }
-  inline int CacheSize() const { return num_cached_levels_; }
-  inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
-  inline int CacheCurrIdx() const { return cached_level_idx_; }
-
- private:
-  /// Initializes members associated with the level cache. Allocates memory for
-  /// the cache from pool, if necessary.
-  Status InitCache(MemPool* pool, int cache_size);
-
-  /// Decodes and writes a batch of levels into the cache. Sets the number of
-  /// values written to the cache in *num_cached_levels. Returns false if there was
-  /// an error decoding a level or if there was a level value greater than max_level_.
-  bool FillCache(int batch_size, int* num_cached_levels);
-
-  /// Buffer for a batch of levels. The memory is allocated and owned by a pool in
-  /// passed in Init().
-  uint8_t* cached_levels_;
-  /// Number of valid level values in the cache.
-  int num_cached_levels_;
-  /// Current index into cached_levels_.
-  int cached_level_idx_;
-  parquet::Encoding::type encoding_;
-
-  /// For error checking and reporting.
-  int max_level_;
-  /// Number of level values cached_levels_ has memory allocated for.
-  int cache_size_;
-  /// Number of remaining data values in the current data page.
-  int num_buffered_values_;
-  string filename_;
-  TErrorCode::type decoding_error_code_;
-};
-
-/// Base class for reading a column. Reads a logical column, not necessarily a column
-/// materialized in the file (e.g. collections). The two subclasses are
-/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep
-/// level pair at a time. The current def and rep level are exposed to the user, and the
-/// corresponding value (if defined) can optionally be copied into a slot via
-/// ReadValue(). Can also write position slots.
-class HdfsParquetScanner::ColumnReader {
- public:
-  virtual ~ColumnReader() { }
-
-  int def_level() const { return def_level_; }
-  int rep_level() const { return rep_level_; }
-
-  const SlotDescriptor* slot_desc() const { return slot_desc_; }
-  const parquet::SchemaElement& schema_element() const { return *node_.element; }
-  int16_t max_def_level() const { return max_def_level_; }
-  int16_t max_rep_level() const { return max_rep_level_; }
-  int def_level_of_immediate_repeated_ancestor() const {
-    return node_.def_level_of_immediate_repeated_ancestor;
-  }
-  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
-  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
-    DCHECK(pos_slot_desc_ == NULL);
-    pos_slot_desc_ = pos_slot_desc;
-  }
-
-  /// Returns true if this reader materializes collections (i.e. CollectionValues).
-  virtual bool IsCollectionReader() const { return false; }
-
-  const char* filename() const { return parent_->filename(); };
-
-  /// Read the current value (or null) into 'tuple' for this column. This should only be
-  /// called when a value is defined, i.e., def_level() >=
-  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce
-  /// no output values), otherwise NextLevels() should be called instead.
-  ///
-  /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be
-  /// called after calling ReadValue()).
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue())
-  /// The codegened functions from all the materialized cols will then be combined
-  /// into one function.
-  /// TODO: another option is to materialize col by col for the entire row batch in
-  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row batches
-  /// are currently dense so we'll need to figure out something there.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Same as ReadValue() but does not advance repetition level. Only valid for columns
-  /// not in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Returns true if this reader needs to be seeded with NextLevels() before
-  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
-  /// Note that all readers need to be seeded before calling the non-batched ReadValue().
-  virtual bool NeedsSeedingForBatchedReading() const { return true; }
-
-  /// Batched version of ReadValue() that reads up to max_values at once and materializes
-  /// them into tuples in tuple_mem. Returns the number of values actually materialized
-  /// in *num_values. The return value, error behavior and state changes are generally
-  /// the same as in ReadValue(). For example, if an error occurs in the middle of
-  /// materializing a batch then false is returned, and num_values, tuple_mem, as well as
-  /// this column reader are left in an undefined state, assuming that the caller will
-  /// immediately abort execution.
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and
-  /// materializes them into tuples in tuple_mem.
-  /// The return value and error behavior are the same as in ReadValueBatch().
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Advances this column reader's def and rep levels to the next logical value, i.e. to
-  /// the next scalar value or the beginning of the next collection, without attempting to
-  /// read the value. This is used to skip past def/rep levels that don't materialize a
-  /// value, such as the def/rep levels corresponding to an empty containing collection.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  virtual bool NextLevels() = 0;
-
-  /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to
-  /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into
-  /// 'tuple') and increments pos_current_value_.
-  void ReadPosition(Tuple* tuple);
-
-  /// Returns true if this column reader has reached the end of the row group.
-  inline bool RowGroupAtEnd() { return rep_level_ == ROW_GROUP_END; }
-
- protected:
-  HdfsParquetScanner* parent_;
-  const SchemaNode& node_;
-  const SlotDescriptor* slot_desc_;
-
-  /// The slot descriptor for the position field of the tuple, if there is one. NULL if
-  /// there's not. Only one column reader for a given tuple desc will have this set.
-  const SlotDescriptor* pos_slot_desc_;
-
-  /// The next value to write into the position slot, if there is one. 64-bit int because
-  /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read).
-  int64_t pos_current_value_;
-
-  /// The current repetition and definition levels of this reader. Advanced via
-  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read). If this is not inside a collection, rep_level_ is
-  /// always 0.
-  /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1.
-  /// The maximum values are cached here because they are accessed in inner loops.
-  int16_t rep_level_;
-  int16_t max_rep_level_;
-  int16_t def_level_;
-  int16_t max_def_level_;
-
-  // Cache frequently accessed members of slot_desc_ for perf.
-
-  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
-  int tuple_offset_;
-
-  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
-  NullIndicatorOffset null_indicator_offset_;
-
-  ColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : parent_(parent),
-      node_(node),
-      slot_desc_(slot_desc),
-      pos_slot_desc_(NULL),
-      pos_current_value_(INVALID_POS),
-      rep_level_(INVALID_LEVEL),
-      max_rep_level_(node_.max_rep_level),
-      def_level_(INVALID_LEVEL),
-      max_def_level_(node_.max_def_level),
-      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
-      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) :
-          slot_desc->null_indicator_offset()) {
-    DCHECK_GE(node_.max_rep_level, 0);
-    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
-    DCHECK_GE(node_.max_def_level, 0);
-    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
-    // rep_level_ is always valid and equal to 0 if col not in collection.
-    if (max_rep_level() == 0) rep_level_ = 0;
-  }
-};
-
-/// Collections are not materialized directly in parquet files; only scalar values appear
-/// in the file. CollectionColumnReader uses the definition and repetition levels of child
-/// column readers to figure out the boundaries of each collection in this column.
-class HdfsParquetScanner::CollectionColumnReader :
-      public HdfsParquetScanner::ColumnReader {
- public:
-  CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ColumnReader(parent, node, slot_desc) {
-    DCHECK(node_.is_repeated());
-    if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType());
-  }
-
-  virtual ~CollectionColumnReader() { }
-
-  vector<ColumnReader*>* children() { return &children_; }
-
-  virtual bool IsCollectionReader() const { return true; }
-
-  /// The repetition level indicating that the current value is the first in a new
-  /// collection (meaning the last value read was the final item in the previous
-  /// collection).
-  int new_collection_rep_level() const { return max_rep_level() - 1; }
-
-  /// Materializes CollectionValue into tuple slot (if materializing) and advances to next
-  /// value.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple);
-
-  /// Same as ReadValue but does not advance repetition level. Only valid for columns not
-  /// in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple);
-
-  /// Advances all child readers to the beginning of the next collection and updates this
-  /// reader's state.
-  virtual bool NextLevels();
-
-  /// This is called once for each row group in the file.
-  void Reset() {
-    def_level_ = -1;
-    rep_level_ = -1;
-    pos_current_value_ = -1;
-  }
-
- private:
-  /// Column readers of fields contained within this collection. There is at least one
-  /// child reader per collection reader. Child readers either materialize slots in the
-  /// collection item tuples, or there is a single child reader that does not materialize
-  /// any slot and is only used by this reader to read def and rep levels.
-  vector<ColumnReader*> children_;
-
-  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child
-  /// reader's state.
-  void UpdateDerivedState();
-
-  /// Recursively reads from children_ to assemble a single CollectionValue into
-  /// *slot. Also advances rep_level_ and def_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  inline bool ReadSlot(void* slot, MemPool* pool);
-};
-
-/// Reader for a single column from the parquet file.  It's associated with a
-/// ScannerContext::Stream and is responsible for decoding the data.  Super class for
-/// per-type column readers. This contains most of the logic, the type specific functions
-/// must be implemented in the subclass.
-class HdfsParquetScanner::BaseScalarColumnReader :
-      public HdfsParquetScanner::ColumnReader {
- public:
-  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ColumnReader(parent, node, slot_desc),
-      def_levels_(true),
-      rep_levels_(false),
-      num_buffered_values_(0),
-      num_values_read_(0),
-      metadata_(NULL),
-      stream_(NULL),
-      decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
-    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
-
-  }
-
-  virtual ~BaseScalarColumnReader() { }
-
-  /// This is called once for each row group in the file.
-  Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
-    DCHECK(stream != NULL);
-    DCHECK(metadata != NULL);
-
-    num_buffered_values_ = 0;
-    data_ = NULL;
-    data_end_ = NULL;
-    stream_ = stream;
-    metadata_ = metadata;
-    num_values_read_ = 0;
-    def_level_ = -1;
-    // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : -1;
-    pos_current_value_ = -1;
-
-    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-      RETURN_IF_ERROR(Codec::CreateDecompressor(
-          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
-    }
-    ClearDictionaryDecoder();
-    return Status::OK();
-  }
-
-  /// Called once when the scanner is complete for final cleanup.
-  void Close() {
-    if (decompressor_.get() != NULL) decompressor_->Close();
-  }
-
-  int64_t total_len() const { return metadata_->total_compressed_size; }
-  int col_idx() const { return node_.col_idx; }
-  THdfsCompression::type codec() const {
-    if (metadata_ == NULL) return THdfsCompression::NONE;
-    return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
-  }
-  MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); }
-
-  /// Reads the next definition and repetition levels for this column. Initializes the
-  /// next data page if necessary.
-  virtual bool NextLevels() { return NextLevels<true>(); }
-
-  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if
-  // we know this row can be skipped. This could be very useful with stats and big
-  // sections can be skipped. Implement that when we can benefit from it.
-
- protected:
-  // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
-  friend class HdfsParquetScanner;
-
-  // Class members that are accessed for every column should be included up here so they
-  // fit in as few cache lines as possible.
-
-  /// Pointer to start of next value in data page
-  uint8_t* data_;
-
-  /// End of the data page.
-  const uint8_t* data_end_;
-
-  /// Decoder for definition levels.
-  LevelDecoder def_levels_;
-
-  /// Decoder for repetition levels.
-  LevelDecoder rep_levels_;
-
-  /// Page encoding for values. Cached here for perf.
-  parquet::Encoding::type page_encoding_;
-
-  /// Num values remaining in the current data page
-  int num_buffered_values_;
-
-  // Less frequently used members that are not accessed in inner loop should go below
-  // here so they do not occupy precious cache line space.
-
-  /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_;
-
-  const parquet::ColumnMetaData* metadata_;
-  scoped_ptr<Codec> decompressor_;
-  ScannerContext::Stream* stream_;
-
-  /// Pool to allocate decompression buffers from.
-  boost::scoped_ptr<MemPool> decompressed_data_pool_;
-
-  /// Header for current data page.
-  parquet::PageHeader current_page_header_;
-
-  /// Read the next data page. If a dictionary page is encountered, that will be read and
-  /// this function will continue reading the next data page.
-  Status ReadDataPage();
-
-  /// Try to move the the next page and buffer more values. Return false and sets rep_level_,
-  /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered.
-  bool NextPage();
-
-  /// Implementation for NextLevels().
-  template <bool ADVANCE_REP_LEVEL>
-  bool NextLevels();
-
-  /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a
-  /// dictionary decoder stored in this object. Subclass must implement this. Returns
-  /// an error status if the dictionary values could not be decoded successfully.
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) = 0;
-
-  /// Return true if the column has an initialized dictionary decoder. Subclass must
-  /// implement this.
-  virtual bool HasDictionaryDecoder() = 0;
-
-  /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass
-  /// must implement this.
-  virtual void ClearDictionaryDecoder() = 0;
-
-  /// Initializes the reader with the data contents. This is the content for the entire
-  /// decompressed data page. Decoders can initialize state from here.
-  virtual Status InitDataPage(uint8_t* data, int size) = 0;
-
- private:
-  /// Writes the next value into *slot using pool if necessary. Also advances rep_level_
-  /// and def_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  template <bool IN_COLLECTION>
-  inline bool ReadSlot(void* slot, MemPool* pool);
-};
-
-/// Per column type reader. If MATERIALIZED is true, the column values are materialized
-/// into the slot described by slot_desc. If MATERIALIZED is false, the column values
-/// are not materialized, but the position can be accessed.
-template<typename T, bool MATERIALIZED>
-class HdfsParquetScanner::ScalarColumnReader :
-      public HdfsParquetScanner::BaseScalarColumnReader {
- public:
-  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc),
-      dict_decoder_init_(false) {
-    if (!MATERIALIZED) {
-      // We're not materializing any values, just counting them. No need (or ability) to
-      // initialize state used to materialize values.
-      DCHECK(slot_desc_ == NULL);
-      return;
-    }
-
-    DCHECK(slot_desc_ != NULL);
-    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL) {
-      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
-    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
-      fixed_len_size_ = slot_desc_->type().len;
-    } else {
-      fixed_len_size_ = -1;
-    }
-    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-        // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
-        // values. Currently all versions have converted values.
-        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
-        slot_desc_->type().type == TYPE_TIMESTAMP &&
-        parent->file_version_.application == "parquet-mr");
-  }
-
-  virtual ~ScalarColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
-  }
-
-  virtual bool NeedsSeedingForBatchedReading() const { return false; }
-
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
-  }
-
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
-  }
-
- protected:
-  template <bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    // NextLevels() should have already been called and def and rep levels should be in
-    // valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
-    DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (MATERIALIZED) {
-      if (def_level_ >= max_def_level()) {
-        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-          if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false;
-        } else {
-          if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false;
-        }
-      } else {
-        tuple->SetNull(null_indicator_offset_);
-      }
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
-
-  /// Implementation of the ReadValueBatch() functions specialized for this
-  /// column reader type. This function drives the reading of data pages and
-  /// caching of rep/def levels. Once a data page and cached levels are available,
-  /// it calls into a more specialized MaterializeValueBatch() for doing the actual
-  /// value materialization using the level caches.
-  template<bool IN_COLLECTION>
-  bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    // Repetition level is only present if this column is nested in a collection type.
-    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
-    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
-
-    int val_count = 0;
-    bool continue_execution = true;
-    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-      // Read next page if necessary.
-      if (num_buffered_values_ == 0) {
-        if (!NextPage()) {
-          continue_execution = parent_->parse_status_.ok();
-          continue;
-        }
-      }
-
-      // Fill def/rep level caches if they are empty.
-      int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_);
-      if (!def_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
-      }
-      // We only need the repetition levels for populating the position slot since we
-      // are only populating top-level tuples.
-      if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) {
-        parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
-      }
-      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-
-      // This special case is most efficiently handled here directly.
-      if (!MATERIALIZED && !IN_COLLECTION) {
-        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count);
-        val_count += vals_to_add;
-        def_levels_.CacheSkipLevels(vals_to_add);
-        num_buffered_values_ -= vals_to_add;
-        continue;
-      }
-
-      // Read data page and cached levels to materialize values.
-      int cache_start_idx = def_levels_.CacheCurrIdx();
-      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
-      int remaining_val_capacity = max_values - val_count;
-      int ret_val_count = 0;
-      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      } else {
-        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
-            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
-      }
-      val_count += ret_val_count;
-      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
-    }
-    *num_values = val_count;
-    return continue_execution;
-  }
-
-  /// Helper function for ReadValueBatch() above that performs value materialization.
-  /// It assumes a data page with remaining values is available, and that the def/rep
-  /// level caches have been populated.
-  /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
-  /// handled in this function.
-  template<bool IN_COLLECTION, bool IS_DICT_ENCODED>
-  bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values) {
-    DCHECK(MATERIALIZED || IN_COLLECTION);
-    DCHECK_GT(num_buffered_values_, 0);
-    DCHECK(def_levels_.CacheHasNext());
-    if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext());
-
-    uint8_t* curr_tuple = tuple_mem;
-    int val_count = 0;
-    while (def_levels_.CacheHasNext()) {
-      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-      int def_level = def_levels_.CacheGetNext();
-
-      if (IN_COLLECTION) {
-        if (def_level < def_level_of_immediate_repeated_ancestor()) {
-          // A containing repeated field is empty or NULL. Skip the value but
-          // move to the next repetition level if necessary.
-          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
-          continue;
-        }
-        if (pos_slot_desc_ != NULL) {
-          int rep_level = rep_levels_.CacheGetNext();
-          // Reset position counter if we are at the start of a new parent collection.
-          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
-          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
-        }
-      }
-
-      if (MATERIALIZED) {
-        if (def_level >= max_def_level()) {
-          bool continue_execution =
-              ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool);
-          if (UNLIKELY(!continue_execution)) return false;
-        } else {
-          tuple->SetNull(null_indicator_offset_);
-        }
-      }
-
-      curr_tuple += tuple_size;
-      ++val_count;
-      if (UNLIKELY(val_count == max_values)) break;
-    }
-    *num_values = val_count;
-    return true;
-  }
-
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
-    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(), "could not decode dictionary");
-    }
-    dict_decoder_init_ = true;
-    *decoder = &dict_decoder_;
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() {
-    return dict_decoder_init_;
-  }
-
-  virtual void ClearDictionaryDecoder() {
-    dict_decoder_init_ = false;
-  }
-
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    page_encoding_ = current_page_header_.data_page_header.encoding;
-    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
-        page_encoding_ != parquet::Encoding::PLAIN) {
-      stringstream ss;
-      ss << "File '" << filename() << "' is corrupt: unexpected encoding: "
-         << PrintEncoding(page_encoding_) << " for data page of column '"
-         << schema_element().name << "'.";
-      return Status(ss.str());
-    }
-
-    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
-    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) {
-      if (!dict_decoder_init_) {
-        return Status("File corrupt. Missing dictionary page.");
-      }
-      dict_decoder_.SetData(data, size);
-    }
-
-    // TODO: Perform filter selectivity checks here.
-    return Status::OK();
-  }
-
- private:
-  /// Writes the next value into *slot using pool if necessary.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  template<bool IS_DICT_ENCODED>
-  inline bool ReadSlot(void* slot, MemPool* pool) {
-    T val;
-    T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
-    if (IS_DICT_ENCODED) {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
-      if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
-        SetDictDecodeError();
-        return false;
-      }
-    } else {
-      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len =
-          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr);
-      if (UNLIKELY(encoded_len < 0)) {
-        SetPlainDecodeError();
-        return false;
-      }
-      data_ += encoded_len;
-    }
-    if (UNLIKELY(NeedsConversion() &&
-            !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
-      return false;
-    }
-    return true;
-  }
-
-  /// Most column readers never require conversion, so we can avoid branches by
-  /// returning constant false. Column readers for types that require conversion
-  /// must specialize this function.
-  inline bool NeedsConversion() const {
-    DCHECK(!needs_conversion_);
-    return false;
-  }
-
-  /// Converts and writes src into dst based on desc_->type()
-  bool ConvertSlot(const T* src, T* dst, MemPool* pool) {
-    DCHECK(false);
-    return false;
-  }
-
-  /// Pull out slow-path Status construction code from ReadRepetitionLevel()/
-  /// ReadDefinitionLevel() for performance.
-  void __attribute__((noinline)) SetDictDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-  void __attribute__((noinline)) SetPlainDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
-        slot_desc_->type().DebugString(), stream_->file_offset());
-  }
-
-  /// Dictionary decoder for decoding column values.
-  DictDecoder<T> dict_decoder_;
-
-  /// True if dict_decoder_ has been initialized with a dictionary page.
-  bool dict_decoder_init_;
-
-  /// true if decoded values must be converted before being written to an output tuple.
-  bool needs_conversion_;
-
-  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
-  /// the max length for VARCHAR columns. Unused otherwise.
-  int fixed_len_size_;
-};
-
-template<>
-inline bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::NeedsConversion() const {
-  return needs_conversion_;
-}
-
-template<>
-bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot(
-    const StringValue* src, StringValue* dst, MemPool* pool) {
-  DCHECK(slot_desc() != NULL);
-  DCHECK(slot_desc()->type().type == TYPE_CHAR);
-  int len = slot_desc()->type().len;
-  StringValue sv;
-  sv.len = len;
-  if (slot_desc()->type().IsVarLenStringType()) {
-    sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
-    if (UNLIKELY(sv.ptr == NULL)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
-          len, "StringValue");
-      parent_->parse_status_ =
-          pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
-      return false;
-    }
-  } else {
-    sv.ptr = reinterpret_cast<char*>(dst);
-  }
-  int unpadded_len = min(len, src->len);
-  memcpy(sv.ptr, src->ptr, unpadded_len);
-  StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);
-
-  if (slot_desc()->type().IsVarLenStringType()) *dst = sv;
-  return true;
-}
-
-template<>
-inline bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, true>::NeedsConversion() const {
-  return needs_conversion_;
-}
-
-template<>
-bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, true>::ConvertSlot(
-    const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
-  *dst = *src;
-  if (dst->HasDateAndTime()) dst->UtcToLocal();
-  return true;
-}
-
-class HdfsParquetScanner::BoolColumnReader :
-      public HdfsParquetScanner::BaseScalarColumnReader {
- public:
-  BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : BaseScalarColumnReader(parent, node, slot_desc) {
-    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
-  }
-
-  virtual ~BoolColumnReader() { }
-
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<true>(pool, tuple);
-  }
-
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
-    return ReadValue<false>(pool, tuple);
-  }
-
- protected:
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) {
-    DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
-                  << "have gotten this far.";
-    return Status::OK();
-  }
-
-  virtual bool HasDictionaryDecoder() {
-    // Decoder should never be created for bools.
-    return false;
-  }
-
-  virtual void ClearDictionaryDecoder() { }
-
-  virtual Status InitDataPage(uint8_t* data, int size) {
-    // Initialize bool decoder
-    bool_values_ = BitReader(data, size);
-    return Status::OK();
-  }
-
- private:
-  template<bool IN_COLLECTION>
-  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
-    DCHECK(slot_desc_ != NULL);
-    // Def and rep levels should be in valid range.
-    DCHECK_GE(rep_level_, 0);
-    DCHECK_LE(rep_level_, max_rep_level());
-    DCHECK_GE(def_level_, 0);
-    DCHECK_LE(def_level_, max_def_level());
-    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-        "Caller should have called NextLevels() until we are ready to read a value";
-
-    if (def_level_ >= max_def_level()) {
-      return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool);
-    } else {
-      // Null value
-      tuple->SetNull(null_indicator_offset_);
-      return NextLevels<IN_COLLECTION>();
-    }
-  }
-
-  /// Writes the next value into *slot using pool if necessary. Also advances def_level_
-  /// and rep_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
-  /// true.
-  template <bool IN_COLLECTION>
-  inline bool ReadSlot(void* slot, MemPool* pool)  {
-    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
-      parent_->parse_status_ = Status("Invalid bool column.");
-      return false;
-    }
-    return NextLevels<IN_COLLECTION>();
-  }
-
-  BitReader bool_values_;
-};
-
-}
-
 Status HdfsParquetScanner::Prepare(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Prepare(context));
   metadata_range_ = stream_->scan_range();
@@ -1154,16 +169,16 @@ void HdfsParquetScanner::Close() {
   vector<THdfsCompression::type> compression_types;
 
   // Visit each column reader, including collection reader children.
-  stack<ColumnReader*> readers;
-  for (ColumnReader* r: column_readers_) readers.push(r);
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
   while (!readers.empty()) {
-    ColumnReader* col_reader = readers.top();
+    ParquetColumnReader* col_reader = readers.top();
     readers.pop();
 
     if (col_reader->IsCollectionReader()) {
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
-      for (ColumnReader* r: *collection_reader->children()) readers.push(r);
+      for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r);
       continue;
     }
 
@@ -1207,629 +222,6 @@ void HdfsParquetScanner::Close() {
   HdfsScanner::Close();
 }
 
-HdfsParquetScanner::ColumnReader* HdfsParquetScanner::CreateReader(
-    const SchemaNode& node, bool is_collection_field, const SlotDescriptor* slot_desc) {
-  ColumnReader* reader = NULL;
-  if (is_collection_field) {
-    // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
-    reader = new CollectionColumnReader(this, node, slot_desc);
-  } else if (slot_desc != NULL) {
-    // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
-    switch (slot_desc->type().type) {
-      case TYPE_BOOLEAN:
-        reader = new BoolColumnReader(this, node, slot_desc);
-        break;
-      case TYPE_TINYINT:
-        reader = new ScalarColumnReader<int8_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_SMALLINT:
-        reader = new ScalarColumnReader<int16_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_INT:
-        reader = new ScalarColumnReader<int32_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_BIGINT:
-        reader = new ScalarColumnReader<int64_t, true>(this, node, slot_desc);
-        break;
-      case TYPE_FLOAT:
-        reader = new ScalarColumnReader<float, true>(this, node, slot_desc);
-        break;
-      case TYPE_DOUBLE:
-        reader = new ScalarColumnReader<double, true>(this, node, slot_desc);
-        break;
-      case TYPE_TIMESTAMP:
-        reader = new ScalarColumnReader<TimestampValue, true>(this, node, slot_desc);
-        break;
-      case TYPE_STRING:
-      case TYPE_VARCHAR:
-      case TYPE_CHAR:
-        reader = new ScalarColumnReader<StringValue, true>(this, node, slot_desc);
-        break;
-      case TYPE_DECIMAL:
-        switch (slot_desc->type().GetByteSize()) {
-          case 4:
-            reader = new ScalarColumnReader<Decimal4Value, true>(this, node, slot_desc);
-            break;
-          case 8:
-            reader = new ScalarColumnReader<Decimal8Value, true>(this, node, slot_desc);
-            break;
-          case 16:
-            reader = new ScalarColumnReader<Decimal16Value, true>(this, node, slot_desc);
-            break;
-        }
-        break;
-      default:
-        DCHECK(false) << slot_desc->type().DebugString();
-    }
-  } else {
-    // Special case for counting scalar values (e.g. count(*), no materialized columns in
-    // the file, only materializing a position slot). We won't actually read any values,
-    // only the rep and def levels, so it doesn't matter what kind of reader we make.
-    reader = new ScalarColumnReader<int8_t, false>(this, node, slot_desc);
-  }
-  return obj_pool_.Add(reader);
-}
-
-bool HdfsParquetScanner::ColumnReader::ReadValueBatch(MemPool* pool, int max_values,
-    int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
-    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
-      // A containing repeated field is empty or NULL
-      continue_execution = NextLevels();
-      continue;
-    }
-    // Fill in position slot if applicable
-    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
-    continue_execution = ReadValue(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-bool HdfsParquetScanner::ColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
-    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
-  int val_count = 0;
-  bool continue_execution = true;
-  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
-    continue_execution = ReadNonRepeatedValue(pool, tuple);
-    ++val_count;
-  }
-  *num_values = val_count;
-  return continue_execution;
-}
-
-void HdfsParquetScanner::ColumnReader::ReadPosition(Tuple* tuple) {
-  DCHECK(pos_slot_desc() != NULL);
-  // NextLevels() should have already been called
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(pos_current_value_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a value";
-
-  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
-  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
-}
-
-// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
-// if this matches those versions and compatibility workarounds need to be used.
-static bool RequiresSkippedDictionaryHeaderCheck(
-    const HdfsParquetScanner::FileVersion& v) {
-  if (v.application != "impala") return false;
-  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
-Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
-  Status status;
-  uint8_t* buffer;
-
-  // We're about to move to the next data page.  The previous data page is
-  // now complete, pass along the memory allocated for it.
-  parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false);
-
-  // Read the next data page, skipping page types we don't care about.
-  // We break out of this loop on the non-error case (a data page was found or we read all
-  // the pages).
-  while (true) {
-    DCHECK_EQ(num_buffered_values_, 0);
-    if (num_values_read_ == metadata_->num_values) {
-      // No more pages to read
-      // TODO: should we check for stream_->eosr()?
-      break;
-    } else if (num_values_read_ > metadata_->num_values) {
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, filename());
-      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    int64_t buffer_size;
-    RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
-    if (buffer_size == 0) {
-      // The data pages contain fewer values than stated in the column metadata.
-      DCHECK(stream_->eosr());
-      DCHECK_LT(num_values_read_, metadata_->num_values);
-      // TODO for 2.3: node_.element->name isn't necessarily useful
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, filename());
-      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    // We don't know the actual header size until the thrift object is deserialized.  Loop
-    // until we successfully deserialize the header or exceed the maximum header size.
-    uint32_t header_size;
-    while (true) {
-      header_size = buffer_size;
-      status = DeserializeThriftMsg(
-          buffer, &header_size, true, &current_page_header_);
-      if (status.ok()) break;
-
-      if (buffer_size >= FLAGS_max_page_header_size) {
-        stringstream ss;
-        ss << "ParquetScanner: could not read data page because page header exceeded "
-           << "maximum size of "
-           << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
-        status.AddDetail(ss.str());
-        return status;
-      }
-
-      // Didn't read entire header, increase buffer size and try again
-      Status status;
-      int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
-      bool success = stream_->GetBytes(
-          new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
-      if (!success) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      DCHECK(status.ok());
-
-      if (buffer_size == new_buffer_size) {
-        DCHECK_NE(new_buffer_size, 0);
-        return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
-      }
-      DCHECK_GT(new_buffer_size, buffer_size);
-      buffer_size = new_buffer_size;
-    }
-
-    // Successfully deserialized current_page_header_
-    if (!stream_->SkipBytes(header_size, &status)) return status;
-
-    int data_size = current_page_header_.compressed_page_size;
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-
-    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
-      if (slot_desc_ == NULL) {
-        // Skip processing the dictionary page if we don't need to decode any values. In
-        // addition to being unnecessary, we are likely unable to successfully decode the
-        // dictionary values because we don't necessarily create the right type of scalar
-        // reader if there's no slot to read into (see CreateReader()).
-        if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-        continue;
-      }
-
-      if (HasDictionaryDecoder()) {
-        return Status("Column chunk should not contain two dictionary pages.");
-      }
-      if (node_.element->type == parquet::Type::BOOLEAN) {
-        return Status("Unexpected dictionary page. Dictionary page is not"
-            " supported for booleans.");
-      }
-      const parquet::DictionaryPageHeader* dict_header = NULL;
-      if (current_page_header_.__isset.dictionary_page_header) {
-        dict_header = &current_page_header_.dictionary_page_header;
-      } else {
-        if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
-          return Status("Dictionary page does not have dictionary header set.");
-        }
-      }
-      if (dict_header != NULL &&
-          dict_header->encoding != parquet::Encoding::PLAIN &&
-          dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
-        return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
-            "for dictionary pages.");
-      }
-
-      if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-      data_end_ = data_ + data_size;
-
-      uint8_t* dict_values = NULL;
-      if (decompressor_.get() != NULL) {
-        dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-              uncompressed_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, uncompressed_size);
-        }
-        RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
-            &uncompressed_size, &dict_values));
-        VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
-        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-          return Status(Substitute("Error decompressing dictionary page in file '$0'. "
-              "Expected $1 uncompressed bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, uncompressed_size));
-        }
-        data_size = uncompressed_size;
-      } else {
-        if (current_page_header_.uncompressed_page_size != data_size) {
-          return Status(Substitute("Error reading dictionary page in file '$0'. "
-              "Expected $1 bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, data_size));
-        }
-        // Copy dictionary from io buffer (which will be recycled as we read
-        // more data) to a new buffer
-        dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-              data_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, data_size);
-        }
-        memcpy(dict_values, data_, data_size);
-      }
-
-      DictDecoderBase* dict_decoder;
-      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder));
-      if (dict_header != NULL &&
-          dict_header->num_values != dict_decoder->num_entries()) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(),
-            Substitute("Expected $0 entries but data contained $1 entries",
-            dict_header->num_values, dict_decoder->num_entries()));
-      }
-      // Done with dictionary page, read next page
-      continue;
-    }
-
-    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
-      // We can safely skip non-data pages
-      if (!stream_->SkipBytes(data_size, &status)) return status;
-      continue;
-    }
-
-    // Read Data Page
-    // TODO: when we start using page statistics, we will need to ignore certain corrupt
-    // statistics. See IMPALA-2208 and PARQUET-251.
-    if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-    data_end_ = data_ + data_size;
-    num_buffered_values_ = current_page_header_.data_page_header.num_values;
-    num_values_read_ += num_buffered_values_;
-
-    if (decompressor_.get() != NULL) {
-      SCOPED_TIMER(parent_->decompress_timer_);
-      uint8_t* decompressed_buffer =
-          decompressed_data_pool_->TryAllocate(uncompressed_size);
-      if (UNLIKELY(decompressed_buffer == NULL)) {
-        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-            uncompressed_size, "decompressed data");
-        return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
-            parent_->state_, details, uncompressed_size);
-      }
-      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
-          current_page_header_.compressed_page_size, data_, &uncompressed_size,
-          &decompressed_buffer));
-      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
-                << " to " << uncompressed_size;
-      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error decompressing data page in file '$0'. "
-            "Expected $1 uncompressed bytes but got $2", filename(),
-            current_page_header_.uncompressed_page_size, uncompressed_size));
-      }
-      data_ = decompressed_buffer;
-      data_size = current_page_header_.uncompressed_page_size;
-      data_end_ = data_ + data_size;
-    } else {
-      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
-      if (current_page_header_.compressed_page_size != uncompressed_size) {
-        return Status(Substitute("Error reading data page in file '$0'. "
-            "Expected $1 bytes but got $2", filename(),
-            current_page_header_.compressed_page_size, uncompressed_size));
-      }
-    }
-
-    // Initialize the repetition level data
-    RETURN_IF_ERROR(rep_levels_.Init(filename(),
-        current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
-        max_rep_level(), num_buffered_values_,
-        &data_, &data_size));
-
-    // Initialize the definition level data
-    RETURN_IF_ERROR(def_levels_.Init(filename(),
-        current_page_header_.data_page_header.definition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
-        max_def_level(), num_buffered_values_, &data_, &data_size));
-
-    // Data can be empty if the column contains all NULLs
-    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
-    break;
-  }
-
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::LevelDecoder::Init(const string& filename,
-    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
-    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
-  encoding_ = encoding;
-  max_level_ = max_level;
-  num_buffered_values_ = num_buffered_values;
-  filename_ = filename;
-  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
-
-  // Return because there is no level data to read, e.g., required field.
-  if (max_level == 0) return Status::OK();
-
-  int32_t num_bytes = 0;
-  switch (encoding) {
-    case parquet::Encoding::RLE: {
-      Status status;
-      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
-        return status;
-      }
-      if (num_bytes < 0) {
-        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
-      }
-      int bit_width = Bits::Log2Ceiling64(max_level + 1);
-      Reset(*data, num_bytes, bit_width);
-      break;
-    }
-    case parquet::Encoding::BIT_PACKED:
-      num_bytes = BitUtil::Ceil(num_buffered_values, 8);
-      bit_reader_.Reset(*data, num_bytes);
-      break;
-    default: {
-      stringstream ss;
-      ss << "Unsupported encoding: " << encoding;
-      return Status(ss.str());
-    }
-  }
-  DCHECK_GT(num_bytes, 0);
-  *data += num_bytes;
-  *data_size -= num_bytes;
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::LevelDecoder::InitCache(MemPool* pool, int cache_size) {
-  num_cached_levels_ = 0;
-  cached_level_idx_ = 0;
-  // Memory has already been allocated.
-  if (cached_levels_ != NULL) {
-    DCHECK_EQ(cache_size_, cache_size);
-    return Status::OK();
-  }
-
-  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
-  if (cached_levels_ == NULL) {
-    return pool->mem_tracker()->MemLimitExceeded(
-        NULL, "Definition level cache", cache_size);
-  }
-  memset(cached_levels_, 0, cache_size);
-  cache_size_ = cache_size;
-  return Status::OK();
-}
-
-inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() {
-  bool valid;
-  uint8_t level;
-  if (encoding_ == parquet::Encoding::RLE) {
-    valid = Get(&level);
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    valid = bit_reader_.GetValue(1, &level);
-  }
-  return LIKELY(valid) ? level : INVALID_LEVEL;
-}
-
-Status HdfsParquetScanner::LevelDecoder::CacheNextBatch(int batch_size) {
-  DCHECK_LE(batch_size, cache_size_);
-  cached_level_idx_ = 0;
-  if (max_level_ > 0) {
-    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
-      return Status(decoding_error_code_, num_buffered_values_, filename_);
-    }
-  } else {
-    // No levels to read, e.g., because the field is required. The cache was
-    // already initialized with all zeros, so we can hand out those values.
-    DCHECK_EQ(max_level_, 0);
-    num_cached_levels_ = batch_size;
-  }
-  return Status::OK();
-}
-
-bool HdfsParquetScanner::LevelDecoder::FillCache(int batch_size,
-    int* num_cached_levels) {
-  DCHECK(num_cached_levels != NULL);
-  int num_values = 0;
-  if (encoding_ == parquet::Encoding::RLE) {
-    while (true) {
-      // Add RLE encoded values by repeating the current value this number of times.
-      uint32_t num_repeats_to_set =
-          min<uint32_t>(repeat_count_, batch_size - num_values);
-      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
-      num_values += num_repeats_to_set;
-      repeat_count_ -= num_repeats_to_set;
-
-      // Add remaining literal values, if any.
-      uint32_t num_literals_to_set =
-          min<uint32_t>(literal_count_, batch_size - num_values);
-      int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size);
-      for (; num_values < num_values_end; ++num_values) {
-        bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]);
-        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
-      }
-      literal_count_ -= num_literals_to_set;
-
-      if (num_values == batch_size) break;
-      if (UNLIKELY(!NextCounts<int16_t>())) return false;
-      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
-    }
-  } else {
-    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
-    for (; num_values < batch_size; ++num_values) {
-      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
-      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
-    }
-  }
-  *num_cached_levels = num_values;
-  return true;
-}
-
-template <bool ADVANCE_REP_LEVEL>
-bool HdfsParquetScanner::BaseScalarColumnReader::NextLevels() {
-  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
-
-  if (UNLIKELY(num_buffered_values_ == 0)) {
-    if (!NextPage()) return parent_->parse_status_.ok();
-  }
-  --num_buffered_values_;
-
-  // Definition level is not present if column and any containing structs are required.
-  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
-
-  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
-    // Repetition level is only present if this column is nested in any collection type.
-    rep_level_ = rep_levels_.ReadLevel();
-    // Reset position counter if we are at the start of a new parent collection.
-    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
-  }
-
-  return parent_->parse_status_.ok();
-}
-
-bool HdfsParquetScanner::BaseScalarColumnReader::NextPage() {
-  parent_->assemble_rows_timer_.Stop();
-  parent_->parse_status_ = ReadDataPage();
-  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-  if (num_buffered_values_ == 0) {
-    rep_level_ = ROW_GROUP_END;
-    def_level_ = INVALID_LEVEL;
-    pos_current_value_ = INVALID_POS;
-    return false;
-  }
-  parent_->assemble_rows_timer_.Start();
-  return true;
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::NextLevels() {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-  for (int c = 0; c < children_.size(); ++c) {
-    do {
-      // TODO(skye): verify somewhere that all column readers are at end
-      if (!children_[c]->NextLevels()) return false;
-    } while (children_[c]->rep_level() > new_collection_rep_level());
-  }
-  UpdateDerivedState();
-  return true;
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
-  DCHECK_GE(rep_level_, 0);
-  DCHECK_GE(def_level_, 0);
-  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
-      "Caller should have called NextLevels() until we are ready to read a value";
-
-  if (tuple_offset_ == -1) {
-    return CollectionColumnReader::NextLevels();
-  } else if (def_level_ >= max_def_level()) {
-    return ReadSlot(tuple->GetSlot(tuple_offset_), pool);
-  } else {
-    // Null value
-    tuple->SetNull(null_indicator_offset_);
-    return CollectionColumnReader::NextLevels();
-  }
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadNonRepeatedValue(
-    MemPool* pool, Tuple* tuple) {
-  return CollectionColumnReader::ReadValue(pool, tuple);
-}
-
-bool HdfsParquetScanner::CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) {
-  DCHECK(!children_.empty());
-  DCHECK_LE(rep_level_, new_collection_rep_level());
-
-  // Recursively read the collection into a new CollectionValue.
-  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot);
-  *coll_slot = CollectionValue();
-  CollectionValueBuilder builder(
-      coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
-  bool continue_execution = parent_->AssembleCollection(
-      children_, new_collection_rep_level(), &builder);
-  if (!continue_execution) return false;
-
-  // AssembleCollection() advances child readers, so we don't need to call NextLevels()
-  UpdateDerivedState();
-  return true;
-}
-
-void HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() {
-  // We don't need to cap our def_level_ at max_def_level(). We always check def_level_
-  // >= max_def_level() to check if the collection is defined.
-  // TODO(skye): consider capping def_level_ at max_def_level()
-  def_level_ = children_[0]->def_level();
-  rep_level_ = children_[0]->rep_level();
-
-  // All children should have been advanced to the beginning of the next collection
-  for (int i = 0; i < children_.size(); ++i) {
-    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
-    if (def_level_ < max_def_level()) {
-      // Collection not defined
-      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
-    } else {
-      // Collection is defined
-      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
-    }
-  }
-
-  if (RowGroupAtEnd()) {
-    // No more values
-    pos_current_value_ = INVALID_POS;
-  } else if (rep_level_ <= max_rep_level() - 2) {
-    // Reset position counter if we are at the start of a new parent collection (i.e.,
-    // the current collection is the first item in a new parent collection).
-    pos_current_value_ = 0;
-  }
-}
-
-Status HdfsParquetScanner::ValidateColumnOffsets(const parquet::RowGroup& row_group) {
-  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
-  for (int i = 0; i < row_group.columns.size(); ++i) {
-    const parquet::ColumnChunk& col_chunk = row_group.columns[i];
-    int64_t col_start = col_chunk.meta_data.data_page_offset;
-    // The file format requires that if a dictionary page exists, it be before data pages.
-    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
-      if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
-        stringstream ss;
-        ss << "File " << file_desc->filename << ": metadata is corrupt. "
-            << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
-            << ") must come before any data pages (offset=" << col_start << ").";
-        return Status(ss.str());
-      }
-      col_start = col_chunk.meta_data.dictionary_page_offset;
-    }
-    int64_t col_len = col_chunk.meta_data.total_compressed_size;
-    int64_t col_end = col_start + col_len;
-    if (col_end <= 0 || col_end > file_desc->file_length) {
-      stringstream ss;
-      ss << "File " << file_desc->filename << ": metadata is corrupt. "
-          << "Column " << i << " has invalid column offsets "
-          << "(offset=" << col_start << ", size=" << col_len << ", "
-          << "file_size=" << file_desc->file_length << ").";
-      return Status(ss.str());
-    }
-  }
-  return Status::OK();
-}
-
 // Get the start of the column.
 static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) {
   if (column.__isset.dictionary_page_offset) {
@@ -1851,18 +243,18 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
   return start_offset + (end_offset - start_offset) / 2;
 }
 
-int HdfsParquetScanner::CountScalarColumns(const vector<ColumnReader*>& column_readers) {
+int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
   DCHECK(!column_readers.empty());
   int num_columns = 0;
-  stack<ColumnReader*> readers;
-  for (ColumnReader* r: column_readers_) readers.push(r);
+  stack<ParquetColumnReader*> readers;
+  for (ParquetColumnReader* r: column_readers_) readers.push(r);
   while (!readers.empty()) {
-    ColumnReader* col_reader = readers.top();
+    ParquetColumnReader* col_reader = readers.top();
     readers.pop();
     if (col_reader->IsCollectionReader()) {
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
-      for (ColumnReader* r: *collection_reader->children()) readers.push(r);
+      for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r);
       continue;
     }
     ++num_columns;
@@ -1875,11 +267,16 @@ Status HdfsParquetScanner::ProcessSplit() {
   // First process the file metadata in the footer
   bool eosr;
   RETURN_IF_ERROR(ProcessFooter(&eosr));
-
   if (eosr) return Status::OK();
 
+  // Parse the file schema into an internal representation for schema resolution.
+  ParquetSchemaResolver schema_resolver(*scan_node_->hdfs_table(),
+      state_->query_options().parquet_fallback_schema_resolution);
+  RETURN_IF_ERROR(schema_resolver.Init(&file_metadata_, filename()));
+
   // We've processed the metadata and there are columns that need to be materialized.
-  RETURN_IF_ERROR(CreateColumnReaders(*scan_node_->tuple_desc(), &column_readers_));
+  RETURN_IF_ERROR(
+      CreateColumnReaders(*scan_node_->tuple_desc(), schema_resolver, &column_readers_));
   COUNTER_SET(num_cols_counter_,
       static_cast<int64_t>(CountScalarColumns(column_readers_)));
   // Set top-level template tuple.
@@ -1895,9 +292,11 @@ Status HdfsParquetScanner::ProcessSplit() {
     const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
     if (row_group.num_rows == 0) continue;
 
-    const DiskIoMgr::ScanRange* split_range =
-        reinterpret_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
-    RETURN_IF_ERROR(ValidateColumnOffsets(row_group));
+    const DiskIoMgr::ScanRange* split_range = reinterpret_cast<ScanRangeMetadata*>(
+        metadata_range_->meta_data())->original_split;
+    HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
+        file_desc->filename, file_desc->file_length, row_group));
 
     int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
     int64_t split_offset = split_range->offset();
@@ -1919,7 +318,7 @@ Status HdfsParquetScanner::ProcessSplit() {
 
     // Prepare column readers for first read
     bool continue_execution = true;
-    for (ColumnReader* col_reader: column_readers_) {
+    for (ParquetColumnReader* col_reader: column_readers_) {
       // Seed collection and boolean column readers with NextLevel().
       // The ScalarColumnReaders use an optimized ReadValueBatch() that
       // should not be seeded.
@@ -1950,7 +349,7 @@ Status HdfsParquetScanner::ProcessSplit() {
     // with parse_status_.
     RETURN_IF_ERROR(state_->GetQueryStatus());
     if (UNLIKELY(!parse_status_.ok())) {
-      RETURN_IF_ERROR(LogOrReturnError(parse_status_.msg()));
+      RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
     }
     if (scan_node_->ReachedLimit()) return Status::OK();
     if (context_->cancelled()) return Status::OK();
@@ -2078,7 +477,7 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {
 /// difficult to maintain a maximum memory footprint without throwing away at least
 /// some work. This point needs further experimentation and thought.
 bool HdfsParquetScanner::AssembleRows(
-    const vector<ColumnReader*>& column_readers, int row_group_idx, bool* filters_pass) {
+    const vector<ParquetColumnReader*>& column_readers, int row_group_idx, bool* filters_pass) {
   DCHECK(!column_readers.empty());
   DCHECK(scratch_batch_ != NULL);
 
@@ -2111,7 +510,7 @@ bool HdfsParquetScanner::AssembleRows(
     int last_num_tuples = -1;
     int num_col_readers = column_readers.size();
     for (int c = 0; c < num_col_readers; ++c) {
-      ColumnReader* col_reader = column_readers[c];
+      ParquetColumnReader* col_reader = column_readers[c];
       if (col_reader->max_rep_level() > 0) {
         continue_execution = col_reader->ReadValueBatch(
             scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
@@ -2150,7 +549,7 @@ bool HdfsParquetScanner::AssembleRows(
 }
 
 bool HdfsParquetScanner::AssembleCollection(
-    const vector<ColumnReader*>& column_readers, int new_collection_rep_level,
+    const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level,
     CollectionValueBuilder* coll_value_builder) {
   DCHECK(!column_readers.empty());
   DCHECK_GE(new_collection_rep_level, 0);
@@ -2229,13 +628,13 @@ bool HdfsParquetScanner::AssembleCollection(
 }
 
 inline bool HdfsParquetScanner::ReadCollectionItem(
-    const vector<ColumnReader*>& column_readers,
+    const vector<ParquetColumnReader*>& column_readers,
     bool materialize_tuple, MemPool* pool, Tuple* tuple) const {
   DCHECK(!column_readers.empty());
   bool continue_execution = true;
   int size = column_readers.size();
   for (int c = 0; c < size; ++c) {
-    ColumnReader* col_reader = column_readers[c];
+    ParquetColumnReader* col_reader = column_readers[c];
     if (materialize_tuple) {
       // All column readers for this tuple should a value to materialize.
       FILE_CHECK_GE(col_reader->def_level(),
@@ -2364,9 +763,11 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
         status.GetDetail()));
   }
 
-  RETURN_IF_ERROR(ValidateFileMetadata());
-  // Parse file schema
-  RETURN_IF_ERROR(CreateSchemaTree(file_metadata_.schema, &schema_));
+  RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename()));
+  // Parse out the created by application version string
+  if (file_metadata_.__isset.created_by) {
+    file_version_ = ParquetFileVersion(file_metadata_.created_by);
+  }
 
   if (scan_node_->IsZeroSlotTableScan()) {
     // There are no materialized slots, e.g. count(*) over the table.  We can serve
@@ -2398,301 +799,12 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
     return Status(
         Substitute("Invalid file. This file: $0 has no row groups", filename()));
   }
-  if (schema_.children.empty()) {
-    return Status(Substitute("Invalid file: '$0' has no columns.", filename()));
-  }
-  return Status::OK();
-}
-
-Status HdfsParquetScanner::ResolvePath(const SchemaPath& path, SchemaNode** node,
-    bool* pos_field, bool* missing_field) {
-  *missing_field = false;
-  // First try two-level array encoding.
-  bool missing_field_two_level;
-  Status status_two_level =
-      ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level);
-  if (missing_field_two_level) DCHECK(status_two_level.ok());
-  if (status_two_level.ok() && !missing_field_two_level) return Status::OK();
-  // The two-level resolution failed or reported a missing field, try three-level array
-  // encoding.
-  bool missing_field_three_level;
-  Status status_three_level =
-      ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level);
-  if (missing_field_three_level) DCHECK(status_three_level.ok());
-  if (status_three_level.ok() && !missing_field_three_level) return Status::OK();
-  // The three-level resolution failed or reported a missing field, try one-level array
-  // encoding.
-  bool missing_field_one_level;
-  Status status_one_level =
-      ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level);
-  if (missing_field_one_level) DCHECK(status_one_level.ok());
-  if (status_one_level.ok() && !missing_field_one_level) return Status::OK();
-  // None of resolutions yielded a node. Set *missing_field to true if any of the
-  // resolutions reported a missing a field.
-  if (missing_field_one_level || missing_field_two_level || missing_field_three_level) {
-    *node = NULL;
-    *missing_field = true;
-    return Status::OK();
-  }
-  // All resolutions failed. Log and return the status from the three-level resolution
-  // (which is technically the standard).
-  DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok());
-  *node = NULL;
-  VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace();
-  return status_three_level;
-}
-
-Status HdfsParquetScanner::ResolvePathHelper(ArrayEncoding array_encoding,
-    const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field) {
-  DCHECK(schema_.element != NULL)
-      << "schema_ must be initialized before calling ResolvePath()";
-
-  *pos_field = false;
-  *missing_field = false;
-  *node = &schema_;
-  const ColumnType* col_type = NULL;
-
-  // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by
-  // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema.
-  for (int i = 0; i < path.size(); ++i) {
-    // Advance '*node' if necessary
-    if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
-      *node = NextSchemaNode(col_type, path, i, *node, missing_field);
-      if (*missing_field) return Status::OK();
-    } else {
-      // We just resolved an array, meaning *node is set to the repeated field of the
-      // array. Since we are trying to resolve using one- or two-level array encoding, the
-      // repeated field represents both the array and the array's item (i.e. there is no
-      // explict item field), so we don't advance *node in this case.
-      DCHECK(col_type != NULL);
-      DCHECK_EQ(col_type->type, TYPE_ARRAY);
-      DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
-      DCHECK((*node)->is_repeated());
-    }
-
-    // Advance 'col_type'
-    int table_idx = path[i];
-    col_type = i == 0 ? &scan_node_->hdfs_table()->col_descs()[table_idx].type()
-               : &col_type->children[table_idx];
-
-    // Resolve path[i]
-    if (col_type->type == TYPE_ARRAY) {
-      DCHECK_EQ(col_type->children.size(), 1);
-      RETURN_IF_ERROR(
-          ResolveArray(array_encoding, path, i, node, pos_field, missing_field));
-      if (*missing_field || *pos_field) return Status::OK();
-    } else if (col_type->type == TYPE_MAP) {
-      DCHECK_EQ(col_type->children.size(), 2);
-      RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
-      if (*missing_field) return Status::OK();
-    } else if (col_type->type == TYPE_STRUCT) {
-      DCHECK_GT(col_type->children.size(), 0);
-      // Nothing to do for structs
-    } else {
-      DCHECK(!col_type->IsComplexType());
-      DCHECK_EQ(i, path.size() - 1);
-      RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
-    }
-  }
-  DCHECK(*node != NULL);
-  return Status::OK();
-}
-
-HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode(
-    const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
-    bool* missing_field) {
-  DCHECK_LT(next_idx, path.size());
-  if (next_idx != 0) DCHECK(col_type != NULL);
-
-  int file_idx;
-  int table_idx = path[next_idx];
-  bool resolve_by_name = state_->query_options().parquet_fallback_schema_resolution ==
-      TParquetFallbackSchemaResolution::NAME;
-  if (resolve_by_name) {
-    if (next_idx == 0) {
-      // Resolve top-level table column by name.
-      DCHECK_LT(table_idx, scan_node_->hdfs_table()->col_descs().size());
-      const string& name = scan_node_->hdfs_table()->col_descs()[table_idx].name();
-      file_idx = FindChildWithName(node, name);
-    } else if (col_type->type == TYPE_STRUCT) {
-      // Resolve struct field by name.
-      DCHECK_LT(table_idx, col_type->field_names.size());
-      const string& name = col_type->field_names[table_idx];
-      file_idx = FindChildWithName(node, name);
-    } else if (col_type->type == TYPE_ARRAY) {
-      // Arrays have only one child in the file.
-      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
-      file_idx = table_idx;
-    } else {
-      DCHECK_EQ(col_type->type, TYPE_MAP);
-      // Maps have two values, "key" and "value". These are supposed to be ordered and may
-      // not have the right field names, but try to resolve by name in case they're
-      // switched and otherwise use the order. See
-      // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
-      // more details.
-      DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
-             table_idx == SchemaPathConstants::MAP_VALUE);
-      const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
-      file_idx = FindChildWithName(node, name);
-      if (file_idx >= node->children.size()) {
-        // Couldn't resolve by name, fall back to resolution by position.
-        file_idx = table_idx;
-      }
-    }
-  } else {
-    // Resolution by position.
-    DCHECK_EQ(state_->query_options().parquet_fallback_schema_resolution,
-        TParquetFallbackSchemaResolution::POSITION);
-    if (next_idx == 0) {
-      // For top-level columns, the first index in a path includes the table's partition
-      // keys.
-      file_idx = table_idx - scan_node_->num_partition_keys();
-    } else {
-      file_idx = table_idx;
-    }
-  }
-
-  if (file_idx >= node->children.size()) {
-    VLOG_FILE << Substitute(
-        "File '$0' does not contain path '$1' (resolving by $2)", filename(),
-        PrintPath(path), resolve_by_name ? "name" : "position");
-    *missing_field = true;
-    return NULL;
-  }
-  return &node->children[file_idx];
-}
-
-int HdfsParquetScanner::FindChildWithName(HdfsParquetScanner::SchemaNode* node,
-    const string& name) {
-  int idx;
-  for (idx = 0; idx < node->children.size(); ++idx) {
-    if (node->children[idx].element->name == name) break;
-  }
-  return idx;
-}
-
-// There are three types of array encodings:
-//
-// 1. One-level encoding
-//      A bare repeated field. This is interpreted as a required array of required
-//      items.
-//    Example:
-//      repeated <item-type> item;
-//
-// 2. Two-level encoding
-//      A group containing a single repeated field. This is interpreted as a
-//      <list-repetition> array of required items (<list-repetition> is either
-//      optional or required).
-//    Example:
-//      <list-repetition> group <name> {
-//        repeated <item-type> item;
-//      }
-//
-// 3. Three-level encoding
-//      The "official" encoding according to the parquet spec. A group containing a
-//      single repeated group containing the item field. This is interpreted as a
-//      <list-repetition> array of <item-repetition> items (<list-repetition> and
-//      <item-repetition> are each either optional or required).
-//    Example:
-//      <list-repetition> group <name> {
-//        repeated group list {
-//          <item-repetition> <item-type> item;
-//        }
-//      }
-//
-// We ignore any field annotations or names, making us more permissive than the
-// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a
-// group containing more fields, which corresponds to a complex item type. See
-// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for
-// more details and examples.
-//
-// This function resolves the array at '*node' assuming one-, two-, or three-level
-// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all
-// three encodings (unless '*pos_field' or '*missing_field' are set to true).
-Status HdfsParquetScanner::ResolveArray(ArrayEncoding array_encoding,
-    const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
-    bool* missing_field) {
-  if (array_encoding == ONE_LEVEL) {
-    if (!(*node)->is_repeated()) {
-      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
-          PrintPath(path, idx), "array", (*node)->DebugString());
-      return Status::Expected(msg);
-    }
-  } else {
-    // In the multi-level case, we always expect the outer group to contain a single
-    // repeated field
-    if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
-      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
-          PrintPath(path, idx), "array", (*node)->DebugString());
-      return Status::Expected(msg);
-    }
-    // Set *node to the repeated field
-    *node = &(*node)->children[0];
-  }
-  DCHECK((*node)->is_repeated());
-
-  if (idx + 1 < path.size()) {
-    if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
-      // The next index in 'path' is the artifical position field.
-      DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!";
-      *pos_field = true;
-      *node = NULL;
-      return Status::OK();
-    } else {
-      // The n

<TRUNCATED>


[3/5] incubator-impala git commit: IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 5a12602..3791ae1 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -18,13 +18,27 @@
 
 #include "exec/hdfs-scanner.h"
 #include "exec/parquet-common.h"
+#include "exec/parquet-scratch-tuple-batch.h"
+#include "exec/parquet-metadata-utils.h"
 #include "util/runtime-profile-counters.h"
 
 namespace impala {
 
 class CollectionValueBuilder;
 struct HdfsFileDesc;
-struct ScratchTupleBatch;
+
+/// Internal schema representation and resolution.
+class SchemaNode;
+
+/// Class that implements Parquet definition and repetition level decoding.
+class ParquetLevelDecoder;
+
+/// Per column reader.
+class ParquetColumnReader;
+class CollectionColumnReader;
+class BaseScalarColumnReader;
+template<typename T, bool MATERIALIZED> class ScalarColumnReader;
+class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
 /// the Impala in-memory representation of data, e.g.  (tuples, rows, row batches).
@@ -305,7 +319,7 @@ class HdfsParquetScanner : public HdfsScanner {
  public:
   HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state);
 
-  virtual ~HdfsParquetScanner();
+  virtual ~HdfsParquetScanner() {};
   virtual Status Prepare(ScannerContext* context);
   virtual void Close();
   virtual Status ProcessSplit();
@@ -315,100 +329,24 @@ class HdfsParquetScanner : public HdfsScanner {
   static Status IssueInitialRanges(HdfsScanNode* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
-  struct FileVersion {
-    /// Application that wrote the file. e.g. "IMPALA"
-    std::string application;
-
-    /// Version of the application that wrote the file, expressed in three parts
-    /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are
-    /// ignored. e.g.:
-    /// "1.2.3"    => {1, 2, 3}
-    /// "1.2"      => {1, 2, 0}
-    /// "1.2-cdh5" => {1, 2, 0}
-    struct {
-      int major;
-      int minor;
-      int patch;
-    } version;
-
-    /// If true, this file was generated by an Impala internal release
-    bool is_impala_internal;
-
-    FileVersion() : is_impala_internal(false) { }
-
-    /// Parses the version from the created_by string
-    FileVersion(const std::string& created_by);
-
-    /// Returns true if version is strictly less than <major>.<minor>.<patch>
-    bool VersionLt(int major, int minor = 0, int patch = 0) const;
-
-    /// Returns true if version is equal to <major>.<minor>.<patch>
-    bool VersionEq(int major, int minor, int patch) const;
-  };
-
- private:
-  /// Internal representation of a column schema (including nested-type columns).
-  struct SchemaNode {
-    /// The corresponding schema element defined in the file metadata
-    const parquet::SchemaElement* element;
-
-    /// The index into the RowGroup::columns list if this column is materialized in the
-    /// file (i.e. it's a scalar type). -1 for nested types.
-    int col_idx;
-
-    /// The maximum definition level of this column, i.e., the definition level that
-    /// corresponds to a non-NULL value. Valid values are >= 0.
-    int max_def_level;
-
-    /// The maximum repetition level of this column. Valid values are >= 0.
-    int max_rep_level;
-
-    /// The definition level of the most immediate ancestor of this node with repeated
-    /// field repetition type. 0 if there are no repeated ancestors.
-    int def_level_of_immediate_repeated_ancestor;
-
-    /// Any nested schema nodes. Empty for non-nested types.
-    std::vector<SchemaNode> children;
-
-    SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
-                   def_level_of_immediate_repeated_ancestor(-1) { }
-
-    std::string DebugString(int indent = 0) const;
-
-    bool is_repeated() const {
-      return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
-    }
-  };
-
-  /// Size of the file footer.  This is a guess.  If this value is too little, we will
-  /// need to issue another read.
-  static const int64_t FOOTER_SIZE;
-
   /// The repetition level is set to this value to indicate the end of a row group.
-  static const int16_t ROW_GROUP_END;
+  static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
-  static const int16_t INVALID_LEVEL;
+  static const int16_t INVALID_LEVEL = -1;
   /// Indicates an invalid position value.
-  static const int16_t INVALID_POS;
-
-  /// Class that implements Parquet definition and repetition level decoding.
-  class LevelDecoder;
+  static const int16_t INVALID_POS = -1;
 
-  /// Per column reader.
-  class ColumnReader;
-  friend class ColumnReader;
-
-  class CollectionColumnReader;
+ private:
+  friend class ParquetColumnReader;
   friend class CollectionColumnReader;
-
-  class BaseScalarColumnReader;
   friend class BaseScalarColumnReader;
-
-  template<typename T, bool MATERIALIZED> class ScalarColumnReader;
   template<typename T, bool MATERIALIZED> friend class ScalarColumnReader;
-  class BoolColumnReader;
   friend class BoolColumnReader;
 
+  /// Size of the file footer.  This is a guess.  If this value is too little, we will
+  /// need to issue another read.
+  static const int64_t FOOTER_SIZE = 1024 * 100;
+
   /// Cached runtime filter contexts, one for each filter that applies to this column.
   vector<const FilterContext*> filter_ctxs_;
 
@@ -443,7 +381,7 @@ class HdfsParquetScanner : public HdfsScanner {
   vector<LocalFilterStats> filter_stats_;
 
   /// Column reader for each materialized columns for this file.
-  std::vector<ColumnReader*> column_readers_;
+  std::vector<ParquetColumnReader*> column_readers_;
 
   /// Column readers will write slot values into this scratch batch for
   /// top-level tuples. See AssembleRows().
@@ -453,10 +391,7 @@ class HdfsParquetScanner : public HdfsScanner {
   parquet::FileMetaData file_metadata_;
 
   /// Version of the application that wrote this file.
-  FileVersion file_version_;
-
-  /// The root schema node for this file.
-  SchemaNode schema_;
+  ParquetFileVersion file_version_;
 
   /// Scan range for the metadata.
   const DiskIoMgr::ScanRange* metadata_range_;
@@ -493,7 +428,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// If 'filters_pass' is set to false by this method, the partition columns associated
   /// with this row group did not pass all the runtime filters (and therefore only filter
   /// contexts that apply only to partition columns are checked).
-  bool AssembleRows(const std::vector<ColumnReader*>& column_readers,
+  bool AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
       int row_group_idx, bool* filters_pass);
 
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
@@ -522,7 +457,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// - scan node limit was reached
   /// When false is returned the column_readers are left in an undefined state and
   /// execution should be aborted immediately by the caller.
-  bool AssembleCollection(const std::vector<ColumnReader*>& column_readers,
+  bool AssembleCollection(const std::vector<ParquetColumnReader*>& column_readers,
       int new_collection_rep_level, CollectionValueBuilder* coll_value_builder);
 
   /// Function used by AssembleCollection() to materialize a single collection item
@@ -530,17 +465,13 @@ class HdfsParquetScanner : public HdfsScanner {
   /// otherwise returns true.
   /// If 'materialize_tuple' is false, only advances the column readers' levels,
   /// and does not read any data values.
-  inline bool ReadCollectionItem(const std::vector<ColumnReader*>& column_readers,
+  inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers,
       bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
 
   /// Find and return the last split in the file if it is assigned to this scan node.
   /// Returns NULL otherwise.
   static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file);
 
-  /// Validate column offsets by checking if the dictionary page comes before the data
-  /// pages and checking if the column offsets lie within the file.
-  Status ValidateColumnOffsets(const parquet::RowGroup& row_group);
-
   /// Process the file footer and parse file_metadata_.  This should be called with the
   /// last FOOTER_SIZE bytes in context_.
   /// *eosr is a return value.  If true, the scan range is complete (e.g. select count(*))
@@ -551,23 +482,12 @@ class HdfsParquetScanner : public HdfsScanner {
   /// well. Fills in the appropriate template tuple slot with NULL for any materialized
   /// fields missing in the file.
   Status CreateColumnReaders(const TupleDescriptor& tuple_desc,
-      std::vector<ColumnReader*>* column_readers);
+      const ParquetSchemaResolver& schema_resolver,
+      std::vector<ParquetColumnReader*>* column_readers);
 
   /// Returns the total number of scalar column readers in 'column_readers', including
   /// the children of collection readers.
-  int CountScalarColumns(const std::vector<ColumnReader*>& column_readers);
-
-  /// Creates a column reader for 'node'. slot_desc may be NULL, in which case the
-  /// returned column reader can only be used to read def/rep levels.
-  /// 'is_collection_field' should be set to true if the returned reader is reading a
-  /// collection. This cannot be determined purely by 'node' because a repeated scalar
-  /// node represents both an array and the array's items (in this case
-  /// 'is_collection_field' should be true if the reader reads one value per array, and
-  /// false if it reads one value per item).  The reader is added to the runtime state's
-  /// object pool. Does not create child readers for collection readers; these must be
-  /// added by the caller.
-  ColumnReader* CreateReader(const SchemaNode& node, bool is_collection_field,
-      const SlotDescriptor* slot_desc);
+  int CountScalarColumns(const std::vector<ParquetColumnReader*>& column_readers);
 
   /// Creates a column reader that reads one value for each item in the table or
   /// collection element corresponding to 'parent_path'. 'parent_path' should point to
@@ -578,89 +498,24 @@ class HdfsParquetScanner : public HdfsScanner {
   /// This is used for counting item values, rather than materializing any values. For
   /// example, in a count(*) over a collection, there are no values to materialize, but we
   /// still need to iterate over every item in the collection to count them.
-  Status CreateCountingReader(
-      const SchemaPath& parent_path, ColumnReader** reader);
+  Status CreateCountingReader(const SchemaPath& parent_path,
+      const ParquetSchemaResolver& schema_resolver,
+      ParquetColumnReader** reader);
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
   /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
   /// should be the readers used to materialize a single tuple (i.e., column_readers_ or
   /// the children of a collection node).
   Status InitColumns(
-      int row_group_idx, const std::vector<ColumnReader*>& column_readers);
-
-  /// Validates the file metadata
-  Status ValidateFileMetadata();
-
-  /// Validates the column metadata to make sure this column is supported (e.g. encoding,
-  /// type, etc) and matches the type of col_reader's slot desc.
-  Status ValidateColumn(const BaseScalarColumnReader& col_reader, int row_group_idx);
+      int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers);
 
   /// Performs some validation once we've reached the end of a row group to help detect
   /// bugs or bad input files.
-  Status ValidateEndOfRowGroup(const std::vector<ColumnReader*>& column_readers,
+  Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers,
       int row_group_idx, int64_t rows_read);
 
   /// Part of the HdfsScanner interface, not used in Parquet.
   Status InitNewRange() { return Status::OK(); };
-
-  /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
-  /// SchemaNode representation. Returns the result in 'n' unless an error status is
-  /// returned. Does not set the slot_desc field of any SchemaNode.
-  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
-      SchemaNode* node) const;
-
-  /// Recursive implementation used internally by the above CreateSchemaTree() function.
-  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
-      int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
-      SchemaNode* node) const;
-
-  /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
-  /// does not exist in this file's schema, 'missing_field' is set to true and
-  /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
-  /// resolves to a collecton position field, *pos_field is set to true. Otherwise
-  /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
-  /// against the file's schema (e.g., unrecognized collection schema).
-  ///
-  /// Tries to resolve assuming either two- or three-level array encoding in
-  /// 'schema_'. Returns a bad status if resolution fails in both cases.
-  Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
-      bool* missing_field);
-
-  /// The 'array_encoding' parameter determines whether to assume one-, two-, or
-  /// three-level array encoding. The returned status is not logged (i.e. it's an expected
-  /// error).
-  enum ArrayEncoding {
-    ONE_LEVEL,
-    TWO_LEVEL,
-    THREE_LEVEL
-  };
-  Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
-      SchemaNode** node, bool* pos_field, bool* missing_field);
-
-  /// Helper functions for ResolvePathHelper().
-
-  /// Advances 'node' to one of its children based on path[next_idx] and
-  /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
-  /// associated with 'node'. Returns the child node or sets 'missing_field' to true.
-  SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
-      int next_idx, SchemaNode* node, bool* missing_field);
-
-  /// Returns the index of 'node's child with 'name', or the number of children if not
-  /// found.
-  int FindChildWithName(SchemaNode* node, const string& name);
-
-  /// The ResolvePathHelper() logic for arrays.
-  Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
-    SchemaNode** node, bool* pos_field, bool* missing_field);
-
-  /// The ResolvePathHelper() logic for maps.
-  Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
-      bool* missing_field);
-
-  /// The ResolvePathHelper() logic for scalars (just does validation since there's no
-  /// more actual work to be done).
-  Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
-      const SchemaPath& path, int idx);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index d85ad49..3914845 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -525,7 +525,7 @@ Status HdfsRCFileScanner::ProcessRange() {
         if (error_in_row) {
           error_in_row = false;
           ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", stream_->filename()));
-          RETURN_IF_ERROR(LogOrReturnError(msg));
+          RETURN_IF_ERROR(state_->LogOrReturnError(msg));
         }
 
         current_row->SetTuple(scan_node_->tuple_idx(), tuple);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 5abd346..4f63a73 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -671,23 +671,3 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
   }
 }
 
-Status HdfsScanner::LogOrReturnError(const ErrorMsg& message) const {
-  DCHECK_NE(message.error(), TErrorCode::OK);
-  // If either abort_on_error=true or the error necessitates execution stops
-  // immediately, return an error status.
-  if (state_->abort_on_error() ||
-      message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
-      message.error() == TErrorCode::CANCELLED) {
-    return Status(message);
-  }
-  // Otherwise, add the error to the error log and continue.
-  state_->LogError(message);
-  return Status::OK();
-}
-
-string HdfsScanner::PrintPath(const SchemaPath& path, int subpath_idx) const {
-  SchemaPath::const_iterator subpath_end =
-      subpath_idx == -1 ? path.end() : path.begin() + subpath_idx + 1;
-  SchemaPath subpath(path.begin(), subpath_end);
-  return impala::PrintPath(*scan_node_->hdfs_table(), subpath);
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 9069451..7a723b8 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -408,17 +408,6 @@ class HdfsScanner {
     return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
   }
 
-  /// Given an error message, determine whether execution should be aborted and, if so,
-  /// return the corresponding error status. Otherwise, log the error and return
-  /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to
-  /// true or the error is not recoverable and should be handled upstream.
-  Status LogOrReturnError(const ErrorMsg& message) const;
-
-  // Convenience function for calling the PrintPath() function in
-  // debug-util. 'subpath_idx' can be specified in order to truncate the output to end on
-  // the i-th element of 'path' (inclusive).
-  string PrintPath(const SchemaPath& path, int subpath_idx = -1) const;
-
   /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of
   /// WriteCompleteTuple() because it's easier than writing IR to access
   /// scanner_conjunct_ctxs_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index dcd3081..6d45880 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -286,7 +286,8 @@ Status HdfsTextScanner::FinishScanRange() {
         stringstream ss;
         ss << "Read failed while trying to finish scan range: " << stream_->filename()
            << ":" << stream_->file_offset() << endl << status.GetDetail();
-        RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
+        RETURN_IF_ERROR(state_->LogOrReturnError(
+            ErrorMsg(TErrorCode::GENERAL, ss.str())));
       } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
           !boundary_row_.IsEmpty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
new file mode 100644
index 0000000..280c206
--- /dev/null
+++ b/be/src/exec/parquet-column-readers.cc
@@ -0,0 +1,1093 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet-column-readers.h"
+
+#include <boost/scoped_ptr.hpp>
+#include <string>
+#include <sstream>
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "exec/parquet-metadata-utils.h"
+#include "exec/parquet-scratch-tuple-batch.h"
+#include "exec/read-write-util.h"
+#include "gutil/bits.h"
+#include "rpc/thrift-util.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/tuple-row.h"
+#include "runtime/tuple.h"
+#include "runtime/runtime-state.h"
+#include "runtime/mem-pool.h"
+#include "util/codec.h"
+#include "util/debug-util.h"
+#include "util/dict-encoding.h"
+#include "util/rle-encoding.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+// Provide a workaround for IMPALA-1658.
+DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
+    "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
+    "be converted from UTC to local time. Writes are unaffected.");
+
+// Max data page header size in bytes. This is an estimate and only needs to be an upper
+// bound. It is theoretically possible to have a page header of any size due to string
+// value statistics, but in practice we'll have trouble reading string values this large.
+// Also, this limit is in place to prevent impala from reading corrupt parquet files.
+DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
+
+namespace impala {
+
+const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
+    "$1 bytes for $2.";
+
+Status ParquetLevelDecoder::Init(const string& filename,
+    parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
+    int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  encoding_ = encoding;
+  max_level_ = max_level;
+  num_buffered_values_ = num_buffered_values;
+  filename_ = filename;
+  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
+
+  // Return because there is no level data to read, e.g., required field.
+  if (max_level == 0) return Status::OK();
+
+  int32_t num_bytes = 0;
+  switch (encoding) {
+    case parquet::Encoding::RLE: {
+      Status status;
+      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
+        return status;
+      }
+      if (num_bytes < 0) {
+        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
+      }
+      int bit_width = Bits::Log2Ceiling64(max_level + 1);
+      Reset(*data, num_bytes, bit_width);
+      break;
+    }
+    case parquet::Encoding::BIT_PACKED:
+      num_bytes = BitUtil::Ceil(num_buffered_values, 8);
+      bit_reader_.Reset(*data, num_bytes);
+      break;
+    default: {
+      stringstream ss;
+      ss << "Unsupported encoding: " << encoding;
+      return Status(ss.str());
+    }
+  }
+  DCHECK_GT(num_bytes, 0);
+  *data += num_bytes;
+  *data_size -= num_bytes;
+  return Status::OK();
+}
+
+Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
+  num_cached_levels_ = 0;
+  cached_level_idx_ = 0;
+  // Memory has already been allocated.
+  if (cached_levels_ != NULL) {
+    DCHECK_EQ(cache_size_, cache_size);
+    return Status::OK();
+  }
+
+  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
+  if (cached_levels_ == NULL) {
+    return pool->mem_tracker()->MemLimitExceeded(
+        NULL, "Definition level cache", cache_size);
+  }
+  memset(cached_levels_, 0, cache_size);
+  cache_size_ = cache_size;
+  return Status::OK();
+}
+
+inline int16_t ParquetLevelDecoder::ReadLevel() {
+  bool valid;
+  uint8_t level;
+  if (encoding_ == parquet::Encoding::RLE) {
+    valid = Get(&level);
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
+    valid = bit_reader_.GetValue(1, &level);
+  }
+  return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL;
+}
+
+Status ParquetLevelDecoder::CacheNextBatch(int batch_size) {
+  DCHECK_LE(batch_size, cache_size_);
+  cached_level_idx_ = 0;
+  if (max_level_ > 0) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+      return Status(decoding_error_code_, num_buffered_values_, filename_);
+    }
+  } else {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    DCHECK_EQ(max_level_, 0);
+    num_cached_levels_ = batch_size;
+  }
+  return Status::OK();
+}
+
+bool ParquetLevelDecoder::FillCache(int batch_size,
+    int* num_cached_levels) {
+  DCHECK(num_cached_levels != NULL);
+  int num_values = 0;
+  if (encoding_ == parquet::Encoding::RLE) {
+    while (true) {
+      // Add RLE encoded values by repeating the current value this number of times.
+      uint32_t num_repeats_to_set =
+          min<uint32_t>(repeat_count_, batch_size - num_values);
+      memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
+      num_values += num_repeats_to_set;
+      repeat_count_ -= num_repeats_to_set;
+
+      // Add remaining literal values, if any.
+      uint32_t num_literals_to_set =
+          min<uint32_t>(literal_count_, batch_size - num_values);
+      int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size);
+      for (; num_values < num_values_end; ++num_values) {
+        bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]);
+        if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
+      }
+      literal_count_ -= num_literals_to_set;
+
+      if (num_values == batch_size) break;
+      if (UNLIKELY(!NextCounts<int16_t>())) return false;
+      if (repeat_count_ > 0 && current_value_ > max_level_) return false;
+    }
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
+    for (; num_values < batch_size; ++num_values) {
+      bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
+      if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
+    }
+  }
+  *num_cached_levels = num_values;
+  return true;
+}
+
+/// Per column type reader. If MATERIALIZED is true, the column values are materialized
+/// into the slot described by slot_desc. If MATERIALIZED is false, the column values
+/// are not materialized, but the position can be accessed.
+template<typename T, bool MATERIALIZED>
+class ScalarColumnReader : public BaseScalarColumnReader {
+ public:
+  ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : BaseScalarColumnReader(parent, node, slot_desc),
+      dict_decoder_init_(false) {
+    if (!MATERIALIZED) {
+      // We're not materializing any values, just counting them. No need (or ability) to
+      // initialize state used to materialize values.
+      DCHECK(slot_desc_ == NULL);
+      return;
+    }
+
+    DCHECK(slot_desc_ != NULL);
+    DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
+    if (slot_desc_->type().type == TYPE_DECIMAL) {
+      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
+    } else if (slot_desc_->type().type == TYPE_VARCHAR) {
+      fixed_len_size_ = slot_desc_->type().len;
+    } else {
+      fixed_len_size_ = -1;
+    }
+    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
+        // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
+        // values. Currently all versions have converted values.
+        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
+        slot_desc_->type().type == TYPE_TIMESTAMP &&
+        parent->file_version_.application == "parquet-mr");
+  }
+
+  virtual ~ScalarColumnReader() { }
+
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<true>(pool, tuple);
+  }
+
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<false>(pool, tuple);
+  }
+
+  virtual bool NeedsSeedingForBatchedReading() const { return false; }
+
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
+  }
+
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
+  }
+
+ protected:
+  template <bool IN_COLLECTION>
+  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
+    // NextLevels() should have already been called and def and rep levels should be in
+    // valid range.
+    DCHECK_GE(rep_level_, 0);
+    DCHECK_LE(rep_level_, max_rep_level());
+    DCHECK_GE(def_level_, 0);
+    DCHECK_LE(def_level_, max_def_level());
+    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+        "Caller should have called NextLevels() until we are ready to read a value";
+
+    if (MATERIALIZED) {
+      if (def_level_ >= max_def_level()) {
+        if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+          if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false;
+        } else {
+          if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false;
+        }
+      } else {
+        tuple->SetNull(null_indicator_offset_);
+      }
+    }
+    return NextLevels<IN_COLLECTION>();
+  }
+
+  /// Implementation of the ReadValueBatch() functions specialized for this
+  /// column reader type. This function drives the reading of data pages and
+  /// caching of rep/def levels. Once a data page and cached levels are available,
+  /// it calls into a more specialized MaterializeValueBatch() for doing the actual
+  /// value materialization using the level caches.
+  template<bool IN_COLLECTION>
+  bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    // Repetition level is only present if this column is nested in a collection type.
+    if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+    if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
+
+    int val_count = 0;
+    bool continue_execution = true;
+    while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+      // Read next page if necessary.
+      if (num_buffered_values_ == 0) {
+        if (!NextPage()) {
+          continue_execution = parent_->parse_status_.ok();
+          continue;
+        }
+      }
+
+      // Fill def/rep level caches if they are empty.
+      int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_);
+      if (!def_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
+      }
+      // We only need the repetition levels for populating the position slot since we
+      // are only populating top-level tuples.
+      if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) {
+        parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
+      }
+      if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+
+      // This special case is most efficiently handled here directly.
+      if (!MATERIALIZED && !IN_COLLECTION) {
+        int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count);
+        val_count += vals_to_add;
+        def_levels_.CacheSkipLevels(vals_to_add);
+        num_buffered_values_ -= vals_to_add;
+        continue;
+      }
+
+      // Read data page and cached levels to materialize values.
+      int cache_start_idx = def_levels_.CacheCurrIdx();
+      uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
+      int remaining_val_capacity = max_values - val_count;
+      int ret_val_count = 0;
+      if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, true>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      } else {
+        continue_execution = MaterializeValueBatch<IN_COLLECTION, false>(
+            pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
+      }
+      val_count += ret_val_count;
+      num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+    }
+    *num_values = val_count;
+    return continue_execution;
+  }
+
+  /// Helper function for ReadValueBatch() above that performs value materialization.
+  /// It assumes a data page with remaining values is available, and that the def/rep
+  /// level caches have been populated.
+  /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not
+  /// handled in this function.
+  template<bool IN_COLLECTION, bool IS_DICT_ENCODED>
+  bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) {
+    DCHECK(MATERIALIZED || IN_COLLECTION);
+    DCHECK_GT(num_buffered_values_, 0);
+    DCHECK(def_levels_.CacheHasNext());
+    if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext());
+
+    uint8_t* curr_tuple = tuple_mem;
+    int val_count = 0;
+    while (def_levels_.CacheHasNext()) {
+      Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
+      int def_level = def_levels_.CacheGetNext();
+
+      if (IN_COLLECTION) {
+        if (def_level < def_level_of_immediate_repeated_ancestor()) {
+          // A containing repeated field is empty or NULL. Skip the value but
+          // move to the next repetition level if necessary.
+          if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext();
+          continue;
+        }
+        if (pos_slot_desc_ != NULL) {
+          int rep_level = rep_levels_.CacheGetNext();
+          // Reset position counter if we are at the start of a new parent collection.
+          if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
+          void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
+          *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++;
+        }
+      }
+
+      if (MATERIALIZED) {
+        if (def_level >= max_def_level()) {
+          bool continue_execution =
+              ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool);
+          if (UNLIKELY(!continue_execution)) return false;
+        } else {
+          tuple->SetNull(null_indicator_offset_);
+        }
+      }
+
+      curr_tuple += tuple_size;
+      ++val_count;
+      if (UNLIKELY(val_count == max_values)) break;
+    }
+    *num_values = val_count;
+    return true;
+  }
+
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
+    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(), "could not decode dictionary");
+    }
+    dict_decoder_init_ = true;
+    *decoder = &dict_decoder_;
+    return Status::OK();
+  }
+
+  virtual bool HasDictionaryDecoder() {
+    return dict_decoder_init_;
+  }
+
+  virtual void ClearDictionaryDecoder() {
+    dict_decoder_init_ = false;
+  }
+
+  virtual Status InitDataPage(uint8_t* data, int size) {
+    page_encoding_ = current_page_header_.data_page_header.encoding;
+    if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
+        page_encoding_ != parquet::Encoding::PLAIN) {
+      stringstream ss;
+      ss << "File '" << filename() << "' is corrupt: unexpected encoding: "
+         << PrintEncoding(page_encoding_) << " for data page of column '"
+         << schema_element().name << "'.";
+      return Status(ss.str());
+    }
+
+    // If slot_desc_ is NULL, dict_decoder_ is uninitialized
+    if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) {
+      if (!dict_decoder_init_) {
+        return Status("File corrupt. Missing dictionary page.");
+      }
+      dict_decoder_.SetData(data, size);
+    }
+
+    // TODO: Perform filter selectivity checks here.
+    return Status::OK();
+  }
+
+ private:
+  /// Writes the next value into *slot using pool if necessary.
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  template<bool IS_DICT_ENCODED>
+  inline bool ReadSlot(void* slot, MemPool* pool) {
+    T val;
+    T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
+    if (IS_DICT_ENCODED) {
+      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
+      if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
+        SetDictDecodeError();
+        return false;
+      }
+    } else {
+      DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
+      int encoded_len =
+          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr);
+      if (UNLIKELY(encoded_len < 0)) {
+        SetPlainDecodeError();
+        return false;
+      }
+      data_ += encoded_len;
+    }
+    if (UNLIKELY(NeedsConversion() &&
+            !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
+      return false;
+    }
+    return true;
+  }
+
+  /// Most column readers never require conversion, so we can avoid branches by
+  /// returning constant false. Column readers for types that require conversion
+  /// must specialize this function.
+  inline bool NeedsConversion() const {
+    DCHECK(!needs_conversion_);
+    return false;
+  }
+
+  /// Converts and writes src into dst based on desc_->type()
+  bool ConvertSlot(const T* src, T* dst, MemPool* pool) {
+    DCHECK(false);
+    return false;
+  }
+
+  /// Pull out slow-path Status construction code from ReadRepetitionLevel()/
+  /// ReadDefinitionLevel() for performance.
+  void __attribute__((noinline)) SetDictDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+  void __attribute__((noinline)) SetPlainDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+
+  /// Dictionary decoder for decoding column values.
+  DictDecoder<T> dict_decoder_;
+
+  /// True if dict_decoder_ has been initialized with a dictionary page.
+  bool dict_decoder_init_;
+
+  /// true if decoded values must be converted before being written to an output tuple.
+  bool needs_conversion_;
+
+  /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
+  /// the max length for VARCHAR columns. Unused otherwise.
+  int fixed_len_size_;
+};
+
+template<>
+inline bool ScalarColumnReader<StringValue, true>::NeedsConversion() const {
+  return needs_conversion_;
+}
+
+template<>
+bool ScalarColumnReader<StringValue, true>::ConvertSlot(
+    const StringValue* src, StringValue* dst, MemPool* pool) {
+  DCHECK(slot_desc() != NULL);
+  DCHECK(slot_desc()->type().type == TYPE_CHAR);
+  int len = slot_desc()->type().len;
+  StringValue sv;
+  sv.len = len;
+  if (slot_desc()->type().IsVarLenStringType()) {
+    sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
+    if (UNLIKELY(sv.ptr == NULL)) {
+      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
+          len, "StringValue");
+      parent_->parse_status_ =
+          pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
+      return false;
+    }
+  } else {
+    sv.ptr = reinterpret_cast<char*>(dst);
+  }
+  int unpadded_len = min(len, src->len);
+  memcpy(sv.ptr, src->ptr, unpadded_len);
+  StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);
+
+  if (slot_desc()->type().IsVarLenStringType()) *dst = sv;
+  return true;
+}
+
+template<>
+inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversion() const {
+  return needs_conversion_;
+}
+
+template<>
+bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
+    const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
+  // Conversion should only happen when this flag is enabled.
+  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+  *dst = *src;
+  if (dst->HasDateAndTime()) dst->UtcToLocal();
+  return true;
+}
+
+class BoolColumnReader : public BaseScalarColumnReader {
+ public:
+  BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : BaseScalarColumnReader(parent, node, slot_desc) {
+    if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN);
+  }
+
+  virtual ~BoolColumnReader() { }
+
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<true>(pool, tuple);
+  }
+
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) {
+    return ReadValue<false>(pool, tuple);
+  }
+
+ protected:
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
+    DCHECK(false) << "Dictionary encoding is not supported for bools. Should never "
+                  << "have gotten this far.";
+    return Status::OK();
+  }
+
+  virtual bool HasDictionaryDecoder() {
+    // Decoder should never be created for bools.
+    return false;
+  }
+
+  virtual void ClearDictionaryDecoder() { }
+
+  virtual Status InitDataPage(uint8_t* data, int size) {
+    // Initialize bool decoder
+    bool_values_ = BitReader(data, size);
+    return Status::OK();
+  }
+
+ private:
+  template<bool IN_COLLECTION>
+  inline bool ReadValue(MemPool* pool, Tuple* tuple) {
+    DCHECK(slot_desc_ != NULL);
+    // Def and rep levels should be in valid range.
+    DCHECK_GE(rep_level_, 0);
+    DCHECK_LE(rep_level_, max_rep_level());
+    DCHECK_GE(def_level_, 0);
+    DCHECK_LE(def_level_, max_def_level());
+    DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+        "Caller should have called NextLevels() until we are ready to read a value";
+
+    if (def_level_ >= max_def_level()) {
+      return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool);
+    } else {
+      // Null value
+      tuple->SetNull(null_indicator_offset_);
+      return NextLevels<IN_COLLECTION>();
+    }
+  }
+
+  /// Writes the next value into *slot using pool if necessary. Also advances def_level_
+  /// and rep_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  template <bool IN_COLLECTION>
+  inline bool ReadSlot(void* slot, MemPool* pool)  {
+    if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
+      parent_->parse_status_ = Status("Invalid bool column.");
+      return false;
+    }
+    return NextLevels<IN_COLLECTION>();
+  }
+
+  BitReader bool_values_;
+};
+
+bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
+    int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
+    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
+      // A containing repeated field is empty or NULL
+      continue_execution = NextLevels();
+      continue;
+    }
+    // Fill in position slot if applicable
+    if (pos_slot_desc_ != NULL) ReadPosition(tuple);
+    continue_execution = ReadValue(pool, tuple);
+    ++val_count;
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
+    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
+    continue_execution = ReadNonRepeatedValue(pool, tuple);
+    ++val_count;
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+void ParquetColumnReader::ReadPosition(Tuple* tuple) {
+  DCHECK(pos_slot_desc() != NULL);
+  // NextLevels() should have already been called
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(pos_current_value_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset());
+  *reinterpret_cast<int64_t*>(slot) = pos_current_value_++;
+}
+
+// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
+// if this matches those versions and compatibility workarounds need to be used.
+static bool RequiresSkippedDictionaryHeaderCheck(
+    const ParquetFileVersion& v) {
+  if (v.application != "impala") return false;
+  return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
+}
+
+Status BaseScalarColumnReader::ReadDataPage() {
+  Status status;
+  uint8_t* buffer;
+
+  // We're about to move to the next data page.  The previous data page is
+  // now complete, pass along the memory allocated for it.
+  parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false);
+
+  // Read the next data page, skipping page types we don't care about.
+  // We break out of this loop on the non-error case (a data page was found or we read all
+  // the pages).
+  while (true) {
+    DCHECK_EQ(num_buffered_values_, 0);
+    if (num_values_read_ == metadata_->num_values) {
+      // No more pages to read
+      // TODO: should we check for stream_->eosr()?
+      break;
+    } else if (num_values_read_ > metadata_->num_values) {
+      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+          metadata_->num_values, num_values_read_, node_.element->name, filename());
+      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+      return Status::OK();
+    }
+
+    int64_t buffer_size;
+    RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+    if (buffer_size == 0) {
+      // The data pages contain fewer values than stated in the column metadata.
+      DCHECK(stream_->eosr());
+      DCHECK_LT(num_values_read_, metadata_->num_values);
+      // TODO for 2.3: node_.element->name isn't necessarily useful
+      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+          metadata_->num_values, num_values_read_, node_.element->name, filename());
+      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+      return Status::OK();
+    }
+
+    // We don't know the actual header size until the thrift object is deserialized.  Loop
+    // until we successfully deserialize the header or exceed the maximum header size.
+    uint32_t header_size;
+    while (true) {
+      header_size = buffer_size;
+      status = DeserializeThriftMsg(
+          buffer, &header_size, true, &current_page_header_);
+      if (status.ok()) break;
+
+      if (buffer_size >= FLAGS_max_page_header_size) {
+        stringstream ss;
+        ss << "ParquetScanner: could not read data page because page header exceeded "
+           << "maximum size of "
+           << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+        status.AddDetail(ss.str());
+        return status;
+      }
+
+      // Didn't read entire header, increase buffer size and try again
+      Status status;
+      int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+      bool success = stream_->GetBytes(
+          new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
+      if (!success) {
+        DCHECK(!status.ok());
+        return status;
+      }
+      DCHECK(status.ok());
+
+      if (buffer_size == new_buffer_size) {
+        DCHECK_NE(new_buffer_size, 0);
+        return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+      }
+      DCHECK_GT(new_buffer_size, buffer_size);
+      buffer_size = new_buffer_size;
+    }
+
+    // Successfully deserialized current_page_header_
+    if (!stream_->SkipBytes(header_size, &status)) return status;
+
+    int data_size = current_page_header_.compressed_page_size;
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
+
+    if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
+      if (slot_desc_ == NULL) {
+        // Skip processing the dictionary page if we don't need to decode any values. In
+        // addition to being unnecessary, we are likely unable to successfully decode the
+        // dictionary values because we don't necessarily create the right type of scalar
+        // reader if there's no slot to read into (see CreateReader()).
+        if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+        continue;
+      }
+
+      if (HasDictionaryDecoder()) {
+        return Status("Column chunk should not contain two dictionary pages.");
+      }
+      if (node_.element->type == parquet::Type::BOOLEAN) {
+        return Status("Unexpected dictionary page. Dictionary page is not"
+            " supported for booleans.");
+      }
+      const parquet::DictionaryPageHeader* dict_header = NULL;
+      if (current_page_header_.__isset.dictionary_page_header) {
+        dict_header = &current_page_header_.dictionary_page_header;
+      } else {
+        if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+          return Status("Dictionary page does not have dictionary header set.");
+        }
+      }
+      if (dict_header != NULL &&
+          dict_header->encoding != parquet::Encoding::PLAIN &&
+          dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+        return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
+            "for dictionary pages.");
+      }
+
+      if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+      data_end_ = data_ + data_size;
+
+      uint8_t* dict_values = NULL;
+      if (decompressor_.get() != NULL) {
+        dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
+        if (UNLIKELY(dict_values == NULL)) {
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
+              uncompressed_size, "dictionary");
+          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+              parent_->state_, details, uncompressed_size);
+        }
+        RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
+            &uncompressed_size, &dict_values));
+        VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
+        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+          return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+              "Expected $1 uncompressed bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, uncompressed_size));
+        }
+        data_size = uncompressed_size;
+      } else {
+        if (current_page_header_.uncompressed_page_size != data_size) {
+          return Status(Substitute("Error reading dictionary page in file '$0'. "
+              "Expected $1 bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, data_size));
+        }
+        // Copy dictionary from io buffer (which will be recycled as we read
+        // more data) to a new buffer
+        dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
+        if (UNLIKELY(dict_values == NULL)) {
+          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
+              data_size, "dictionary");
+          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+              parent_->state_, details, data_size);
+        }
+        memcpy(dict_values, data_, data_size);
+      }
+
+      DictDecoderBase* dict_decoder;
+      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder));
+      if (dict_header != NULL &&
+          dict_header->num_values != dict_decoder->num_entries()) {
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(),
+            Substitute("Expected $0 entries but data contained $1 entries",
+            dict_header->num_values, dict_decoder->num_entries()));
+      }
+      // Done with dictionary page, read next page
+      continue;
+    }
+
+    if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
+      // We can safely skip non-data pages
+      if (!stream_->SkipBytes(data_size, &status)) return status;
+      continue;
+    }
+
+    // Read Data Page
+    // TODO: when we start using page statistics, we will need to ignore certain corrupt
+    // statistics. See IMPALA-2208 and PARQUET-251.
+    if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+    data_end_ = data_ + data_size;
+    num_buffered_values_ = current_page_header_.data_page_header.num_values;
+    num_values_read_ += num_buffered_values_;
+
+    if (decompressor_.get() != NULL) {
+      SCOPED_TIMER(parent_->decompress_timer_);
+      uint8_t* decompressed_buffer =
+          decompressed_data_pool_->TryAllocate(uncompressed_size);
+      if (UNLIKELY(decompressed_buffer == NULL)) {
+        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
+            uncompressed_size, "decompressed data");
+        return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
+            parent_->state_, details, uncompressed_size);
+      }
+      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+          current_page_header_.compressed_page_size, data_, &uncompressed_size,
+          &decompressed_buffer));
+      VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
+                << " to " << uncompressed_size;
+      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error decompressing data page in file '$0'. "
+            "Expected $1 uncompressed bytes but got $2", filename(),
+            current_page_header_.uncompressed_page_size, uncompressed_size));
+      }
+      data_ = decompressed_buffer;
+      data_size = current_page_header_.uncompressed_page_size;
+      data_end_ = data_ + data_size;
+    } else {
+      DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
+      if (current_page_header_.compressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error reading data page in file '$0'. "
+            "Expected $1 bytes but got $2", filename(),
+            current_page_header_.compressed_page_size, uncompressed_size));
+      }
+    }
+
+    // Initialize the repetition level data
+    RETURN_IF_ERROR(rep_levels_.Init(filename(),
+        current_page_header_.data_page_header.repetition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_rep_level(), num_buffered_values_,
+        &data_, &data_size));
+
+    // Initialize the definition level data
+    RETURN_IF_ERROR(def_levels_.Init(filename(),
+        current_page_header_.data_page_header.definition_level_encoding,
+        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        max_def_level(), num_buffered_values_, &data_, &data_size));
+
+    // Data can be empty if the column contains all NULLs
+    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
+    break;
+  }
+
+  return Status::OK();
+}
+
+template <bool ADVANCE_REP_LEVEL>
+bool BaseScalarColumnReader::NextLevels() {
+  if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
+
+  if (UNLIKELY(num_buffered_values_ == 0)) {
+    if (!NextPage()) return parent_->parse_status_.ok();
+  }
+  --num_buffered_values_;
+
+  // Definition level is not present if column and any containing structs are required.
+  def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
+
+  if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
+    // Repetition level is only present if this column is nested in any collection type.
+    rep_level_ = rep_levels_.ReadLevel();
+    // Reset position counter if we are at the start of a new parent collection.
+    if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
+  }
+
+  return parent_->parse_status_.ok();
+}
+
+bool BaseScalarColumnReader::NextPage() {
+  parent_->assemble_rows_timer_.Stop();
+  parent_->parse_status_ = ReadDataPage();
+  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+  if (num_buffered_values_ == 0) {
+    rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
+    return false;
+  }
+  parent_->assemble_rows_timer_.Start();
+  return true;
+}
+
+bool CollectionColumnReader::NextLevels() {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+  for (int c = 0; c < children_.size(); ++c) {
+    do {
+      // TODO(skye): verify somewhere that all column readers are at end
+      if (!children_[c]->NextLevels()) return false;
+    } while (children_[c]->rep_level() > new_collection_rep_level());
+  }
+  UpdateDerivedState();
+  return true;
+}
+
+bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+
+  if (tuple_offset_ == -1) {
+    return CollectionColumnReader::NextLevels();
+  } else if (def_level_ >= max_def_level()) {
+    return ReadSlot(tuple->GetSlot(tuple_offset_), pool);
+  } else {
+    // Null value
+    tuple->SetNull(null_indicator_offset_);
+    return CollectionColumnReader::NextLevels();
+  }
+}
+
+bool CollectionColumnReader::ReadNonRepeatedValue(
+    MemPool* pool, Tuple* tuple) {
+  return CollectionColumnReader::ReadValue(pool, tuple);
+}
+
+bool CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+
+  // Recursively read the collection into a new CollectionValue.
+  CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot);
+  *coll_slot = CollectionValue();
+  CollectionValueBuilder builder(
+      coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+  bool continue_execution = parent_->AssembleCollection(
+      children_, new_collection_rep_level(), &builder);
+  if (!continue_execution) return false;
+
+  // AssembleCollection() advances child readers, so we don't need to call NextLevels()
+  UpdateDerivedState();
+  return true;
+}
+
+void CollectionColumnReader::UpdateDerivedState() {
+  // We don't need to cap our def_level_ at max_def_level(). We always check def_level_
+  // >= max_def_level() to check if the collection is defined.
+  // TODO(skye): consider capping def_level_ at max_def_level()
+  def_level_ = children_[0]->def_level();
+  rep_level_ = children_[0]->rep_level();
+
+  // All children should have been advanced to the beginning of the next collection
+  for (int i = 0; i < children_.size(); ++i) {
+    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
+    if (def_level_ < max_def_level()) {
+      // Collection not defined
+      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
+    } else {
+      // Collection is defined
+      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
+    }
+  }
+
+  if (RowGroupAtEnd()) {
+    // No more values
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
+  } else if (rep_level_ <= max_rep_level() - 2) {
+    // Reset position counter if we are at the start of a new parent collection (i.e.,
+    // the current collection is the first item in a new parent collection).
+    pos_current_value_ = 0;
+  }
+}
+
+ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
+    bool is_collection_field, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+  ParquetColumnReader* reader = NULL;
+  if (is_collection_field) {
+    // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
+    reader = new CollectionColumnReader(parent, node, slot_desc);
+  } else if (slot_desc != NULL) {
+    // Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
+    switch (slot_desc->type().type) {
+      case TYPE_BOOLEAN:
+        reader = new BoolColumnReader(parent, node, slot_desc);
+        break;
+      case TYPE_TINYINT:
+        reader = new ScalarColumnReader<int8_t, true>(parent, node, slot_desc);
+        break;
+      case TYPE_SMALLINT:
+        reader = new ScalarColumnReader<int16_t, true>(parent, node, slot_desc);
+        break;
+      case TYPE_INT:
+        reader = new ScalarColumnReader<int32_t, true>(parent, node, slot_desc);
+        break;
+      case TYPE_BIGINT:
+        reader = new ScalarColumnReader<int64_t, true>(parent, node, slot_desc);
+        break;
+      case TYPE_FLOAT:
+        reader = new ScalarColumnReader<float, true>(parent, node, slot_desc);
+        break;
+      case TYPE_DOUBLE:
+        reader = new ScalarColumnReader<double, true>(parent, node, slot_desc);
+        break;
+      case TYPE_TIMESTAMP:
+        reader = new ScalarColumnReader<TimestampValue, true>(parent, node, slot_desc);
+        break;
+      case TYPE_STRING:
+      case TYPE_VARCHAR:
+      case TYPE_CHAR:
+        reader = new ScalarColumnReader<StringValue, true>(parent, node, slot_desc);
+        break;
+      case TYPE_DECIMAL:
+        switch (slot_desc->type().GetByteSize()) {
+          case 4:
+            reader = new ScalarColumnReader<Decimal4Value, true>(
+                parent, node, slot_desc);
+            break;
+          case 8:
+            reader = new ScalarColumnReader<Decimal8Value, true>(
+                parent, node, slot_desc);
+            break;
+          case 16:
+            reader = new ScalarColumnReader<Decimal16Value, true>(
+                parent, node, slot_desc);
+            break;
+        }
+        break;
+      default:
+        DCHECK(false) << slot_desc->type().DebugString();
+    }
+  } else {
+    // Special case for counting scalar values (e.g. count(*), no materialized columns in
+    // the file, only materializing a position slot). We won't actually read any values,
+    // only the rep and def levels, so it doesn't matter what kind of reader we make.
+    reader = new ScalarColumnReader<int8_t, false>(parent, node, slot_desc);
+  }
+  return parent->obj_pool_.Add(reader);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
new file mode 100644
index 0000000..3c26084
--- /dev/null
+++ b/be/src/exec/parquet-column-readers.h
@@ -0,0 +1,500 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_PARQUET_COLUMN_READERS_H
+#define IMPALA_PARQUET_COLUMN_READERS_H
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "util/codec.h"
+#include "util/dict-encoding.h"
+#include "util/rle-encoding.h"
+
+namespace impala {
+
+class Tuple;
+class MemPool;
+
+/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and
+/// caches level values in batches.
+/// Level values are unsigned 8-bit integers because we support a maximum nesting
+/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
+/// populating the level cache (e.g., with RLE we can memset() repeated values).
+///
+/// Inherits from RleDecoder instead of containing one for performance reasons.
+/// The containment design would require two BitReaders per column reader. The extra
+/// BitReader causes enough bloat for a column reader to require another cache line.
+/// TODO: It is not clear whether the inheritance vs. containment choice still makes
+/// sense with column-wise materialization. The containment design seems cleaner and
+/// we should revisit.
+class ParquetLevelDecoder : public RleDecoder {
+ public:
+  ParquetLevelDecoder(bool is_def_level_decoder)
+    : cached_levels_(NULL),
+      num_cached_levels_(0),
+      cached_level_idx_(0),
+      encoding_(parquet::Encoding::PLAIN),
+      max_level_(0),
+      cache_size_(0),
+      num_buffered_values_(0),
+      decoding_error_code_(is_def_level_decoder ?
+          TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) {
+  }
+
+  /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
+  /// encoding requires reading metadata from the page header.
+  Status Init(const string& filename, parquet::Encoding::type encoding,
+      MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
+      uint8_t** data, int* data_size);
+
+  /// Returns the next level or INVALID_LEVEL if there was an error.
+  inline int16_t ReadLevel();
+
+  /// Decodes and caches the next batch of levels. Resets members associated with the
+  /// cache. Returns a non-ok status if there was a problem decoding a level, or if a
+  /// level was encountered with a value greater than max_level_.
+  Status CacheNextBatch(int batch_size);
+
+  /// Functions for working with the level cache.
+  inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
+  inline uint8_t CacheGetNext() {
+    DCHECK_LT(cached_level_idx_, num_cached_levels_);
+    return cached_levels_[cached_level_idx_++];
+  }
+  inline void CacheSkipLevels(int num_levels) {
+    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
+    cached_level_idx_ += num_levels;
+  }
+  inline int CacheSize() const { return num_cached_levels_; }
+  inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
+  inline int CacheCurrIdx() const { return cached_level_idx_; }
+
+ private:
+  /// Initializes members associated with the level cache. Allocates memory for
+  /// the cache from pool, if necessary.
+  Status InitCache(MemPool* pool, int cache_size);
+
+  /// Decodes and writes a batch of levels into the cache. Sets the number of
+  /// values written to the cache in *num_cached_levels. Returns false if there was
+  /// an error decoding a level or if there was a level value greater than max_level_.
+  bool FillCache(int batch_size, int* num_cached_levels);
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a pool in
+  /// passed in Init().
+  uint8_t* cached_levels_;
+  /// Number of valid level values in the cache.
+  int num_cached_levels_;
+  /// Current index into cached_levels_.
+  int cached_level_idx_;
+  parquet::Encoding::type encoding_;
+
+  /// For error checking and reporting.
+  int max_level_;
+  /// Number of level values cached_levels_ has memory allocated for.
+  int cache_size_;
+  /// Number of remaining data values in the current data page.
+  int num_buffered_values_;
+  string filename_;
+  TErrorCode::type decoding_error_code_;
+};
+
+/// Base class for reading a Parquet column. Reads a logical column, not necessarily a
+/// column materialized in the file (e.g. collections). The two subclasses are
+/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep
+/// level pair at a time. The current def and rep level are exposed to the user, and the
+/// corresponding value (if defined) can optionally be copied into a slot via
+/// ReadValue(). Can also write position slots.
+class ParquetColumnReader {
+ public:
+  /// Creates a column reader for 'node' and associates it with the given parent scanner.
+  /// Adds the new column reader to the parent's object pool.
+  /// 'slot_desc' may be NULL, in which case the returned column reader can only be used
+  /// to read def/rep levels.
+  /// 'is_collection_field' should be set to true if the returned reader is reading a
+  /// collection. This cannot be determined purely by 'node' because a repeated scalar
+  /// node represents both an array and the array's items (in this case
+  /// 'is_collection_field' should be true if the reader reads one value per array, and
+  /// false if it reads one value per item).  The reader is added to the runtime state's
+  /// object pool. Does not create child readers for collection readers; these must be
+  /// added by the caller.
+  static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field,
+      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
+
+  virtual ~ParquetColumnReader() { }
+
+  int def_level() const { return def_level_; }
+  int rep_level() const { return rep_level_; }
+
+  const SlotDescriptor* slot_desc() const { return slot_desc_; }
+  const parquet::SchemaElement& schema_element() const { return *node_.element; }
+  int16_t max_def_level() const { return max_def_level_; }
+  int16_t max_rep_level() const { return max_rep_level_; }
+  int def_level_of_immediate_repeated_ancestor() const {
+    return node_.def_level_of_immediate_repeated_ancestor;
+  }
+  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
+  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
+    DCHECK(pos_slot_desc_ == NULL);
+    pos_slot_desc_ = pos_slot_desc;
+  }
+
+  /// Returns true if this reader materializes collections (i.e. CollectionValues).
+  virtual bool IsCollectionReader() const { return false; }
+
+  const char* filename() const { return parent_->filename(); };
+
+  /// Read the current value (or null) into 'tuple' for this column. This should only be
+  /// called when a value is defined, i.e., def_level() >=
+  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce
+  /// no output values), otherwise NextLevels() should be called instead.
+  ///
+  /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be
+  /// called after calling ReadValue()).
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue())
+  /// The codegened functions from all the materialized cols will then be combined
+  /// into one function.
+  /// TODO: another option is to materialize col by col for the entire row batch in
+  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row batches
+  /// are currently dense so we'll need to figure out something there.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Same as ReadValue() but does not advance repetition level. Only valid for columns
+  /// not in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Returns true if this reader needs to be seeded with NextLevels() before
+  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
+  /// Note that all readers need to be seeded before calling the non-batched ReadValue().
+  virtual bool NeedsSeedingForBatchedReading() const { return true; }
+
+  /// Batched version of ReadValue() that reads up to max_values at once and materializes
+  /// them into tuples in tuple_mem. Returns the number of values actually materialized
+  /// in *num_values. The return value, error behavior and state changes are generally
+  /// the same as in ReadValue(). For example, if an error occurs in the middle of
+  /// materializing a batch then false is returned, and num_values, tuple_mem, as well as
+  /// this column reader are left in an undefined state, assuming that the caller will
+  /// immediately abort execution.
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values);
+
+  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and
+  /// materializes them into tuples in tuple_mem.
+  /// The return value and error behavior are the same as in ReadValueBatch().
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values);
+
+  /// Advances this column reader's def and rep levels to the next logical value, i.e. to
+  /// the next scalar value or the beginning of the next collection, without attempting to
+  /// read the value. This is used to skip past def/rep levels that don't materialize a
+  /// value, such as the def/rep levels corresponding to an empty containing collection.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  virtual bool NextLevels() = 0;
+
+  /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to
+  /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into
+  /// 'tuple') and increments pos_current_value_.
+  void ReadPosition(Tuple* tuple);
+
+  /// Returns true if this column reader has reached the end of the row group.
+  inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; }
+
+ protected:
+  HdfsParquetScanner* parent_;
+  const SchemaNode& node_;
+  const SlotDescriptor* slot_desc_;
+
+  /// The slot descriptor for the position field of the tuple, if there is one. NULL if
+  /// there's not. Only one column reader for a given tuple desc will have this set.
+  const SlotDescriptor* pos_slot_desc_;
+
+  /// The next value to write into the position slot, if there is one. 64-bit int because
+  /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a
+  /// current rep and def level (i.e. before the first NextLevels() call or after the last
+  /// value in the column has been read).
+  int64_t pos_current_value_;
+
+  /// The current repetition and definition levels of this reader. Advanced via
+  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a
+  /// current rep and def level (i.e. before the first NextLevels() call or after the last
+  /// value in the column has been read). If this is not inside a collection, rep_level_ is
+  /// always 0.
+  /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1.
+  /// The maximum values are cached here because they are accessed in inner loops.
+  int16_t rep_level_;
+  int16_t max_rep_level_;
+  int16_t def_level_;
+  int16_t max_def_level_;
+
+  // Cache frequently accessed members of slot_desc_ for perf.
+
+  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
+  int tuple_offset_;
+
+  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
+  NullIndicatorOffset null_indicator_offset_;
+
+  ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : parent_(parent),
+      node_(node),
+      slot_desc_(slot_desc),
+      pos_slot_desc_(NULL),
+      pos_current_value_(HdfsParquetScanner::INVALID_POS),
+      rep_level_(HdfsParquetScanner::INVALID_LEVEL),
+      max_rep_level_(node_.max_rep_level),
+      def_level_(HdfsParquetScanner::INVALID_LEVEL),
+      max_def_level_(node_.max_def_level),
+      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
+      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) :
+          slot_desc->null_indicator_offset()) {
+    DCHECK_GE(node_.max_rep_level, 0);
+    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
+    DCHECK_GE(node_.max_def_level, 0);
+    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
+    // rep_level_ is always valid and equal to 0 if col not in collection.
+    if (max_rep_level() == 0) rep_level_ = 0;
+  }
+};
+
+/// Reader for a single column from the parquet file.  It's associated with a
+/// ScannerContext::Stream and is responsible for decoding the data.  Super class for
+/// per-type column readers. This contains most of the logic, the type specific functions
+/// must be implemented in the subclass.
+class BaseScalarColumnReader : public ParquetColumnReader {
+ public:
+  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc),
+      data_(NULL),
+      data_end_(NULL),
+      def_levels_(true),
+      rep_levels_(false),
+      page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
+      num_buffered_values_(0),
+      num_values_read_(0),
+      metadata_(NULL),
+      stream_(NULL),
+      decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
+  }
+
+  virtual ~BaseScalarColumnReader() { }
+
+  /// This is called once for each row group in the file.
+  Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
+    DCHECK(stream != NULL);
+    DCHECK(metadata != NULL);
+
+    num_buffered_values_ = 0;
+    data_ = NULL;
+    data_end_ = NULL;
+    stream_ = stream;
+    metadata_ = metadata;
+    num_values_read_ = 0;
+    def_level_ = -1;
+    // See ColumnReader constructor.
+    rep_level_ = max_rep_level() == 0 ? 0 : -1;
+    pos_current_value_ = -1;
+
+    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+      RETURN_IF_ERROR(Codec::CreateDecompressor(
+          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
+    }
+    ClearDictionaryDecoder();
+    return Status::OK();
+  }
+
+  /// Called once when the scanner is complete for final cleanup.
+  void Close() {
+    if (decompressor_.get() != NULL) decompressor_->Close();
+  }
+
+  int64_t total_len() const { return metadata_->total_compressed_size; }
+  int col_idx() const { return node_.col_idx; }
+  THdfsCompression::type codec() const {
+    if (metadata_ == NULL) return THdfsCompression::NONE;
+    return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
+  }
+  MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); }
+
+  /// Reads the next definition and repetition levels for this column. Initializes the
+  /// next data page if necessary.
+  virtual bool NextLevels() { return NextLevels<true>(); }
+
+  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if
+  // we know this row can be skipped. This could be very useful with stats and big
+  // sections can be skipped. Implement that when we can benefit from it.
+
+ protected:
+  // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
+  friend class HdfsParquetScanner;
+
+  // Class members that are accessed for every column should be included up here so they
+  // fit in as few cache lines as possible.
+
+  /// Pointer to start of next value in data page
+  uint8_t* data_;
+
+  /// End of the data page.
+  const uint8_t* data_end_;
+
+  /// Decoder for definition levels.
+  ParquetLevelDecoder def_levels_;
+
+  /// Decoder for repetition levels.
+  ParquetLevelDecoder rep_levels_;
+
+  /// Page encoding for values. Cached here for perf.
+  parquet::Encoding::type page_encoding_;
+
+  /// Num values remaining in the current data page
+  int num_buffered_values_;
+
+  // Less frequently used members that are not accessed in inner loop should go below
+  // here so they do not occupy precious cache line space.
+
+  /// The number of values seen so far. Updated per data page.
+  int64_t num_values_read_;
+
+  const parquet::ColumnMetaData* metadata_;
+  boost::scoped_ptr<Codec> decompressor_;
+  ScannerContext::Stream* stream_;
+
+  /// Pool to allocate decompression buffers from.
+  boost::scoped_ptr<MemPool> decompressed_data_pool_;
+
+  /// Header for current data page.
+  parquet::PageHeader current_page_header_;
+
+  /// Read the next data page. If a dictionary page is encountered, that will be read and
+  /// this function will continue reading the next data page.
+  Status ReadDataPage();
+
+  /// Try to move the the next page and buffer more values. Return false and sets rep_level_,
+  /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered.
+  bool NextPage();
+
+  /// Implementation for NextLevels().
+  template <bool ADVANCE_REP_LEVEL>
+  bool NextLevels();
+
+  /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a
+  /// dictionary decoder stored in this object. Subclass must implement this. Returns
+  /// an error status if the dictionary values could not be decoded successfully.
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) = 0;
+
+  /// Return true if the column has an initialized dictionary decoder. Subclass must
+  /// implement this.
+  virtual bool HasDictionaryDecoder() = 0;
+
+  /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass
+  /// must implement this.
+  virtual void ClearDictionaryDecoder() = 0;
+
+  /// Initializes the reader with the data contents. This is the content for the entire
+  /// decompressed data page. Decoders can initialize state from here.
+  virtual Status InitDataPage(uint8_t* data, int size) = 0;
+
+ private:
+  /// Writes the next value into *slot using pool if necessary. Also advances rep_level_
+  /// and def_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  template <bool IN_COLLECTION>
+  inline bool ReadSlot(void* slot, MemPool* pool);
+};
+
+/// Collections are not materialized directly in parquet files; only scalar values appear
+/// in the file. CollectionColumnReader uses the definition and repetition levels of child
+/// column readers to figure out the boundaries of each collection in this column.
+class CollectionColumnReader : public ParquetColumnReader {
+ public:
+  CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc) {
+    DCHECK(node_.is_repeated());
+    if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType());
+  }
+
+  virtual ~CollectionColumnReader() { }
+
+  vector<ParquetColumnReader*>* children() { return &children_; }
+
+  virtual bool IsCollectionReader() const { return true; }
+
+  /// The repetition level indicating that the current value is the first in a new
+  /// collection (meaning the last value read was the final item in the previous
+  /// collection).
+  int new_collection_rep_level() const { return max_rep_level() - 1; }
+
+  /// Materializes CollectionValue into tuple slot (if materializing) and advances to next
+  /// value.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple);
+
+  /// Same as ReadValue but does not advance repetition level. Only valid for columns not
+  /// in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple);
+
+  /// Advances all child readers to the beginning of the next collection and updates this
+  /// reader's state.
+  virtual bool NextLevels();
+
+  /// This is called once for each row group in the file.
+  void Reset() {
+    def_level_ = -1;
+    rep_level_ = -1;
+    pos_current_value_ = -1;
+  }
+
+ private:
+  /// Column readers of fields contained within this collection. There is at least one
+  /// child reader per collection reader. Child readers either materialize slots in the
+  /// collection item tuples, or there is a single child reader that does not materialize
+  /// any slot and is only used by this reader to read def and rep levels.
+  vector<ParquetColumnReader*> children_;
+
+  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child
+  /// reader's state.
+  void UpdateDerivedState();
+
+  /// Recursively reads from children_ to assemble a single CollectionValue into
+  /// *slot. Also advances rep_level_ and def_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns
+  /// true.
+  inline bool ReadSlot(void* slot, MemPool* pool);
+};
+
+}
+
+#endif


[5/5] incubator-impala git commit: IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.

Posted by ta...@apache.org.
IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.

This patch refactors hdfs-parquet-scanner.cc into several files.
The new responsibilities of each file/component are roughly as follows:

hdfs-parquet-scanner.h/cc
- Creates column readers and uses them to materializes row batches.
- Evaluates runtime filters and conjuncts, populates row batch queue.

parquet-metadata-utils.h/cc
- Contains utilities for validating Parquet file metadata.
- Parses the schema of a Parquet file into our internal schema
  representation.
- Resolves SchemaPaths (e.g. from a table descriptor) against
  the internal representation of the Parquet file schema.

parquet-column-readers.h/cc
- Contains the per-column data reading, parsing and value
  materialization logic.

Testing: A private core/hdfs run passed.

Change-Id: I4c5fd46f9c1a0ff2a4c30ea5a712fbae17c68f92
Reviewed-on: http://gerrit.cloudera.org:8080/3596
Tested-by: Internal Jenkins
Reviewed-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6ee15fad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6ee15fad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6ee15fad

Branch: refs/heads/master
Commit: 6ee15fadedcac9d41f8ad660caf8d4a60267df8e
Parents: baf8fe2
Author: Alex Behm <al...@cloudera.com>
Authored: Tue May 17 10:46:36 2016 -0700
Committer: Taras Bobrovytsky <ta...@apache.org>
Committed: Fri Jul 15 18:27:05 2016 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                |    2 +
 be/src/exec/base-sequence-scanner.cc      |    2 +-
 be/src/exec/hdfs-parquet-scanner.cc       | 2316 +-----------------------
 be/src/exec/hdfs-parquet-scanner.h        |  221 +--
 be/src/exec/hdfs-rcfile-scanner.cc        |    2 +-
 be/src/exec/hdfs-scanner.cc               |   20 -
 be/src/exec/hdfs-scanner.h                |   11 -
 be/src/exec/hdfs-text-scanner.cc          |    3 +-
 be/src/exec/parquet-column-readers.cc     | 1093 +++++++++++
 be/src/exec/parquet-column-readers.h      |  500 +++++
 be/src/exec/parquet-metadata-utils.cc     |  647 +++++++
 be/src/exec/parquet-metadata-utils.h      |  202 +++
 be/src/exec/parquet-scratch-tuple-batch.h |   72 +
 be/src/exec/parquet-version-test.cc       |    7 +-
 be/src/exprs/expr-value.h                 |    2 +-
 be/src/runtime/runtime-state.cc           |   14 +
 be/src/runtime/runtime-state.h            |    6 +
 be/src/util/debug-util.cc                 |    8 +
 be/src/util/debug-util.h                  |   18 +
 19 files changed, 2684 insertions(+), 2462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 876fc7e..7cf4267 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -62,6 +62,8 @@ add_library(Exec
   hbase-table-scanner.cc
   incr-stats-util.cc
   nested-loop-join-node.cc
+  parquet-column-readers.cc
+  parquet-metadata-utils.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc
   partitioned-hash-join-node.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index dc7a983..268fdae 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -124,7 +124,7 @@ Status BaseSequenceScanner::ProcessSplit() {
     header_ = state_->obj_pool()->Add(AllocateFileHeader());
     Status status = ReadFileHeader();
     if (!status.ok()) {
-      RETURN_IF_ERROR(LogOrReturnError(status.msg()));
+      RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
       // We need to complete the ranges for this file.
       CloseFileRanges(stream_->filename());
       return Status::OK();


[2/5] incubator-impala git commit: IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
new file mode 100644
index 0000000..5f10f62
--- /dev/null
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -0,0 +1,647 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "exec/parquet-metadata-utils.h"
+
+#include <string>
+#include <sstream>
+#include <vector>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "exec/parquet-common.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+
+using std::endl;
+using std::string;
+using std::stringstream;
+using std::vector;
+using strings::Substitute;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+
+namespace impala {
+
+Status ParquetMetadataUtils::ValidateFileVersion(
+    const parquet::FileMetaData& file_metadata, const char* filename) {
+  if (file_metadata.version > PARQUET_CURRENT_VERSION) {
+    stringstream ss;
+    ss << "File: " << filename << " is of an unsupported version. "
+       << "file version: " << file_metadata.version;
+    return Status(ss.str());
+  }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
+    int64_t file_length, const parquet::RowGroup& row_group) {
+  for (int i = 0; i < row_group.columns.size(); ++i) {
+    const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+    int64_t col_start = col_chunk.meta_data.data_page_offset;
+    // The file format requires that if a dictionary page exists, it be before data pages.
+    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
+        stringstream ss;
+        ss << "File " << filename << ": metadata is corrupt. "
+            << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
+            << ") must come before any data pages (offset=" << col_start << ").";
+        return Status(ss.str());
+      }
+      col_start = col_chunk.meta_data.dictionary_page_offset;
+    }
+    int64_t col_len = col_chunk.meta_data.total_compressed_size;
+    int64_t col_end = col_start + col_len;
+    if (col_end <= 0 || col_end > file_length) {
+      stringstream ss;
+      ss << "File " << filename << ": metadata is corrupt. "
+          << "Column " << i << " has invalid column offsets "
+          << "(offset=" << col_start << ", size=" << col_len << ", "
+          << "file_size=" << file_length << ").";
+      return Status(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+static bool IsEncodingSupported(parquet::Encoding::type e) {
+  switch (e) {
+    case parquet::Encoding::PLAIN:
+    case parquet::Encoding::PLAIN_DICTIONARY:
+    case parquet::Encoding::BIT_PACKED:
+    case parquet::Encoding::RLE:
+      return true;
+    default:
+      return false;
+  }
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata,
+    const char* filename, int row_group_idx, int col_idx,
+    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+    RuntimeState* state) {
+  const parquet::ColumnChunk& file_data =
+      file_metadata.row_groups[row_group_idx].columns[col_idx];
+
+  // Check the encodings are supported.
+  const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
+  for (int i = 0; i < encodings.size(); ++i) {
+    if (!IsEncodingSupported(encodings[i])) {
+      stringstream ss;
+      ss << "File '" << filename << "' uses an unsupported encoding: "
+         << PrintEncoding(encodings[i]) << " for column '" << schema_element.name
+         << "'.";
+      return Status(ss.str());
+    }
+  }
+
+  // Check the compression is supported.
+  if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+      file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
+      file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
+    stringstream ss;
+    ss << "File '" << filename << "' uses an unsupported compression: "
+        << file_data.meta_data.codec << " for column '" << schema_element.name
+        << "'.";
+    return Status(ss.str());
+  }
+
+  // Validation after this point is only if col_reader is reading values.
+  if (slot_desc == NULL) return Status::OK();
+
+  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
+  DCHECK_EQ(type, file_data.meta_data.type)
+      << "Should have been validated in ResolvePath()";
+
+  // Check the decimal scale in the file matches the metastore scale and precision.
+  // We fail the query if the metadata makes it impossible for us to safely read
+  // the file. If we don't require the metadata, we will fail the query if
+  // abort_on_error is true, otherwise we will just log a warning.
+  bool is_converted_type_decimal = schema_element.__isset.converted_type &&
+      schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+  if (slot_desc->type().type == TYPE_DECIMAL) {
+    // We require that the scale and byte length be set.
+    if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
+      return Status(ss.str());
+    }
+
+    if (!schema_element.__isset.type_length) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' does not have type_length set.";
+      return Status(ss.str());
+    }
+
+    int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+    if (schema_element.type_length != expected_len) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' has an invalid type length. Expecting: " << expected_len
+         << " len in file: " << schema_element.type_length;
+      return Status(ss.str());
+    }
+
+    if (!schema_element.__isset.scale) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' does not have the scale set.";
+      return Status(ss.str());
+    }
+
+    if (schema_element.scale != slot_desc->type().scale) {
+      // TODO: we could allow a mismatch and do a conversion at this step.
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' has a scale that does not match the table metadata scale."
+         << " File metadata scale: " << schema_element.scale
+         << " Table metadata scale: " << slot_desc->type().scale;
+      return Status(ss.str());
+    }
+
+    // The other decimal metadata should be there but we don't need it.
+    if (!schema_element.__isset.precision) {
+      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename,
+          schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    } else {
+      if (schema_element.precision != slot_desc->type().precision) {
+        // TODO: we could allow a mismatch and do a conversion at this step.
+        ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
+            schema_element.precision, slot_desc->type().precision);
+        RETURN_IF_ERROR(state->LogOrReturnError(msg));
+      }
+    }
+
+    if (!is_converted_type_decimal) {
+      // TODO: is this validation useful? It is not required at all to read the data and
+      // might only serve to reject otherwise perfectly readable files.
+      ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename,
+          schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    }
+  } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
+      is_converted_type_decimal) {
+    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename,
+        schema_element.name, slot_desc->type().DebugString());
+    RETURN_IF_ERROR(state->LogOrReturnError(msg));
+  }
+  return Status::OK();
+}
+
+ParquetFileVersion::ParquetFileVersion(const string& created_by) {
+  string created_by_lower = created_by;
+  std::transform(created_by_lower.begin(), created_by_lower.end(),
+      created_by_lower.begin(), ::tolower);
+  is_impala_internal = false;
+
+  vector<string> tokens;
+  split(tokens, created_by_lower, is_any_of(" "), token_compress_on);
+  // Boost always creates at least one token
+  DCHECK_GT(tokens.size(), 0);
+  application = tokens[0];
+
+  if (tokens.size() >= 3 && tokens[1] == "version") {
+    string version_string = tokens[2];
+    // Ignore any trailing nodextra characters
+    int n = version_string.find_first_not_of("0123456789.");
+    string version_string_trimmed = version_string.substr(0, n);
+
+    vector<string> version_tokens;
+    split(version_tokens, version_string_trimmed, is_any_of("."));
+    version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
+    version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
+    version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
+
+    if (application == "impala") {
+      if (version_string.find("-internal") != string::npos) is_impala_internal = true;
+    }
+  } else {
+    version.major = 0;
+    version.minor = 0;
+    version.patch = 0;
+  }
+}
+
+bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const {
+  if (version.major < major) return true;
+  if (version.major > major) return false;
+  DCHECK_EQ(version.major, major);
+  if (version.minor < minor) return true;
+  if (version.minor > minor) return false;
+  DCHECK_EQ(version.minor, minor);
+  return version.patch < patch;
+}
+
+bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const {
+  return version.major == major && version.minor == minor && version.patch == patch;
+}
+
+static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) {
+  switch (t) {
+    case parquet::FieldRepetitionType::REQUIRED: return "required";
+    case parquet::FieldRepetitionType::OPTIONAL: return "optional";
+    case parquet::FieldRepetitionType::REPEATED: return "repeated";
+    default: return "<unknown>";
+  }
+}
+
+static string PrintParquetType(const parquet::Type::type& t) {
+  switch (t) {
+    case parquet::Type::BOOLEAN: return "boolean";
+    case parquet::Type::INT32: return "int32";
+    case parquet::Type::INT64: return "int64";
+    case parquet::Type::INT96: return "int96";
+    case parquet::Type::FLOAT: return "float";
+    case parquet::Type::DOUBLE: return "double";
+    case parquet::Type::BYTE_ARRAY: return "byte_array";
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array";
+    default: return "<unknown>";
+  }
+}
+
+string SchemaNode::DebugString(int indent) const {
+  stringstream ss;
+  for (int i = 0; i < indent; ++i) ss << " ";
+  ss << PrintRepetitionType(element->repetition_type) << " ";
+  if (element->num_children > 0) {
+    ss << "struct";
+  } else {
+    ss << PrintParquetType(element->type);
+  }
+  ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level
+     << " r:" << max_rep_level << "]";
+  if (element->num_children > 0) {
+    ss << " {" << endl;
+    for (int i = 0; i < element->num_children; ++i) {
+      ss << children[i].DebugString(indent + 2) << endl;
+    }
+    for (int i = 0; i < indent; ++i) ss << " ";
+    ss << "}";
+  }
+  return ss.str();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(const vector<parquet::SchemaElement>& schema,
+    SchemaNode* node) const {
+  int idx = 0;
+  int col_idx = 0;
+  RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node));
+  if (node->children.empty()) {
+    return Status(Substitute("Invalid file: '$0' has no columns.", filename_));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+    const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level,
+    int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
+    const {
+  if (*idx >= schema.size()) {
+    return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
+            "flattened schema in file metadata", filename_));
+  }
+  node->element = &schema[*idx];
+  ++(*idx);
+
+  if (node->element->num_children == 0) {
+    // node is a leaf node, meaning it's materialized in the file and appears in
+    // file_metadata_.row_groups.columns
+    node->col_idx = *col_idx;
+    ++(*col_idx);
+  }
+
+  // def_level_of_immediate_repeated_ancestor does not include this node, so set before
+  // updating ira_def_level
+  node->def_level_of_immediate_repeated_ancestor = ira_def_level;
+
+  if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
+    ++max_def_level;
+  } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED) {
+    ++max_rep_level;
+    // Repeated fields add a definition level. This is used to distinguish between an
+    // empty list and a list with an item in it.
+    ++max_def_level;
+    // node is the new most immediate repeated ancestor
+    ira_def_level = max_def_level;
+  }
+  node->max_def_level = max_def_level;
+  node->max_rep_level = max_rep_level;
+
+  node->children.resize(node->element->num_children);
+  for (int i = 0; i < node->element->num_children; ++i) {
+    RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level,
+        idx, col_idx, &node->children[i]));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node,
+    bool* pos_field, bool* missing_field) const {
+  *missing_field = false;
+  // First try two-level array encoding.
+  bool missing_field_two_level;
+  Status status_two_level =
+      ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level);
+  if (missing_field_two_level) DCHECK(status_two_level.ok());
+  if (status_two_level.ok() && !missing_field_two_level) return Status::OK();
+  // The two-level resolution failed or reported a missing field, try three-level array
+  // encoding.
+  bool missing_field_three_level;
+  Status status_three_level =
+      ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level);
+  if (missing_field_three_level) DCHECK(status_three_level.ok());
+  if (status_three_level.ok() && !missing_field_three_level) return Status::OK();
+  // The three-level resolution failed or reported a missing field, try one-level array
+  // encoding.
+  bool missing_field_one_level;
+  Status status_one_level =
+      ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level);
+  if (missing_field_one_level) DCHECK(status_one_level.ok());
+  if (status_one_level.ok() && !missing_field_one_level) return Status::OK();
+  // None of resolutions yielded a node. Set *missing_field to true if any of the
+  // resolutions reported a missing a field.
+  if (missing_field_one_level || missing_field_two_level || missing_field_three_level) {
+    *node = NULL;
+    *missing_field = true;
+    return Status::OK();
+  }
+  // All resolutions failed. Log and return the status from the three-level resolution
+  // (which is technically the standard).
+  DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok());
+  *node = NULL;
+  VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace();
+  return status_three_level;
+}
+
+Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding,
+    const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field) const {
+  DCHECK(schema_.element != NULL)
+      << "schema_ must be initialized before calling ResolvePath()";
+
+  *pos_field = false;
+  *missing_field = false;
+  *node = const_cast<SchemaNode*>(&schema_);
+  const ColumnType* col_type = NULL;
+
+  // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by
+  // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema.
+  for (int i = 0; i < path.size(); ++i) {
+    // Advance '*node' if necessary
+    if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
+      *node = NextSchemaNode(col_type, path, i, *node, missing_field);
+      if (*missing_field) return Status::OK();
+    } else {
+      // We just resolved an array, meaning *node is set to the repeated field of the
+      // array. Since we are trying to resolve using one- or two-level array encoding, the
+      // repeated field represents both the array and the array's item (i.e. there is no
+      // explict item field), so we don't advance *node in this case.
+      DCHECK(col_type != NULL);
+      DCHECK_EQ(col_type->type, TYPE_ARRAY);
+      DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
+      DCHECK((*node)->is_repeated());
+    }
+
+    // Advance 'col_type'
+    int table_idx = path[i];
+    col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type()
+               : &col_type->children[table_idx];
+
+    // Resolve path[i]
+    if (col_type->type == TYPE_ARRAY) {
+      DCHECK_EQ(col_type->children.size(), 1);
+      RETURN_IF_ERROR(
+          ResolveArray(array_encoding, path, i, node, pos_field, missing_field));
+      if (*missing_field || *pos_field) return Status::OK();
+    } else if (col_type->type == TYPE_MAP) {
+      DCHECK_EQ(col_type->children.size(), 2);
+      RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
+      if (*missing_field) return Status::OK();
+    } else if (col_type->type == TYPE_STRUCT) {
+      DCHECK_GT(col_type->children.size(), 0);
+      // Nothing to do for structs
+    } else {
+      DCHECK(!col_type->IsComplexType());
+      DCHECK_EQ(i, path.size() - 1);
+      RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
+    }
+  }
+  DCHECK(*node != NULL);
+  return Status::OK();
+}
+
+SchemaNode* ParquetSchemaResolver::NextSchemaNode(
+    const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
+    bool* missing_field) const {
+  DCHECK_LT(next_idx, path.size());
+  if (next_idx != 0) DCHECK(col_type != NULL);
+
+  int file_idx;
+  int table_idx = path[next_idx];
+  if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) {
+    if (next_idx == 0) {
+      // Resolve top-level table column by name.
+      DCHECK_LT(table_idx, tbl_desc_.col_descs().size());
+      const string& name = tbl_desc_.col_descs()[table_idx].name();
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_STRUCT) {
+      // Resolve struct field by name.
+      DCHECK_LT(table_idx, col_type->field_names.size());
+      const string& name = col_type->field_names[table_idx];
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_ARRAY) {
+      // Arrays have only one child in the file.
+      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+      file_idx = table_idx;
+    } else {
+      DCHECK_EQ(col_type->type, TYPE_MAP);
+      // Maps have two values, "key" and "value". These are supposed to be ordered and may
+      // not have the right field names, but try to resolve by name in case they're
+      // switched and otherwise use the order. See
+      // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+      // more details.
+      DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
+             table_idx == SchemaPathConstants::MAP_VALUE);
+      const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
+      file_idx = FindChildWithName(node, name);
+      if (file_idx >= node->children.size()) {
+        // Couldn't resolve by name, fall back to resolution by position.
+        file_idx = table_idx;
+      }
+    }
+  } else {
+    // Resolution by position.
+    DCHECK_EQ(fallback_schema_resolution_,
+        TParquetFallbackSchemaResolution::type::POSITION);
+    if (next_idx == 0) {
+      // For top-level columns, the first index in a path includes the table's partition
+      // keys.
+      file_idx = table_idx - tbl_desc_.num_clustering_cols();
+    } else {
+      file_idx = table_idx;
+    }
+  }
+
+  if (file_idx >= node->children.size()) {
+    string schema_resolution_mode = "unknown";
+    auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find(
+        fallback_schema_resolution_);
+    if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) {
+      schema_resolution_mode = entry->second;
+    }
+    VLOG_FILE << Substitute(
+        "File '$0' does not contain path '$1' (resolving by $2)", filename_,
+        PrintPath(tbl_desc_, path), schema_resolution_mode);
+    *missing_field = true;
+    return NULL;
+  }
+  return &node->children[file_idx];
+}
+
+int ParquetSchemaResolver::FindChildWithName(SchemaNode* node,
+    const string& name) const {
+  int idx;
+  for (idx = 0; idx < node->children.size(); ++idx) {
+    if (node->children[idx].element->name == name) break;
+  }
+  return idx;
+}
+
+// There are three types of array encodings:
+//
+// 1. One-level encoding
+//      A bare repeated field. This is interpreted as a required array of required
+//      items.
+//    Example:
+//      repeated <item-type> item;
+//
+// 2. Two-level encoding
+//      A group containing a single repeated field. This is interpreted as a
+//      <list-repetition> array of required items (<list-repetition> is either
+//      optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated <item-type> item;
+//      }
+//
+// 3. Three-level encoding
+//      The "official" encoding according to the parquet spec. A group containing a
+//      single repeated group containing the item field. This is interpreted as a
+//      <list-repetition> array of <item-repetition> items (<list-repetition> and
+//      <item-repetition> are each either optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated group list {
+//          <item-repetition> <item-type> item;
+//        }
+//      }
+//
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a
+// group containing more fields, which corresponds to a complex item type. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for
+// more details and examples.
+//
+// This function resolves the array at '*node' assuming one-, two-, or three-level
+// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all
+// three encodings (unless '*pos_field' or '*missing_field' are set to true).
+Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding,
+    const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
+    bool* missing_field) const {
+  if (array_encoding == ONE_LEVEL) {
+    if (!(*node)->is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+  } else {
+    // In the multi-level case, we always expect the outer group to contain a single
+    // repeated field
+    if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+    // Set *node to the repeated field
+    *node = &(*node)->children[0];
+  }
+  DCHECK((*node)->is_repeated());
+
+  if (idx + 1 < path.size()) {
+    if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
+      // The next index in 'path' is the artifical position field.
+      DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!";
+      *pos_field = true;
+      *node = NULL;
+      return Status::OK();
+    } else {
+      // The next value in 'path' should be the item index
+      DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM);
+    }
+  }
+  return Status::OK();
+}
+
+// According to the parquet spec, map columns are represented like:
+// <map-repetition> group <name> (MAP) {
+//   repeated group key_value {
+//     required <key-type> key;
+//     <value-repetition> <value-type> value;
+//   }
+// }
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+// more details.
+Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
+    bool* missing_field) const {
+  if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() ||
+      (*node)->children[0].children.size() != 2) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString());
+    return Status::Expected(msg);
+  }
+  *node = &(*node)->children[0];
+
+  // The next index in 'path' should be the key or the value.
+  if (idx + 1 < path.size()) {
+    DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY ||
+           path[idx + 1] == SchemaPathConstants::MAP_VALUE);
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
+    const ColumnType& col_type, const SchemaPath& path, int idx) const {
+  if (!node.children.empty()) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type];
+  if (type != node.element->type) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
new file mode 100644
index 0000000..a07627e
--- /dev/null
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -0,0 +1,202 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+
+#include <string>
+
+#include "runtime/descriptors.h"
+#include "gen-cpp/parquet_types.h"
+
+namespace impala {
+
+class RuntimeState;
+
+class ParquetMetadataUtils {
+ public:
+  /// Checks the version of the given file and returns a non-OK status if
+  /// Impala does not support that version.
+  static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata,
+      const char* filename);
+
+  /// Validate column offsets by checking if the dictionary page comes before the data
+  /// pages and checking if the column offsets lie within the file.
+  static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
+      const parquet::RowGroup& row_group);
+
+  /// Validates the column metadata to make sure this column is supported (e.g. encoding,
+  /// type, etc) and matches the type of given slot_desc.
+  static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
+      const char* filename, int row_group_idx, int col_idx,
+      const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+      RuntimeState* state);
+};
+
+struct ParquetFileVersion {
+  /// Application that wrote the file. e.g. "IMPALA"
+  std::string application;
+
+  /// Version of the application that wrote the file, expressed in three parts
+  /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are
+  /// ignored. e.g.:
+  /// "1.2.3"    => {1, 2, 3}
+  /// "1.2"      => {1, 2, 0}
+  /// "1.2-cdh5" => {1, 2, 0}
+  struct {
+    int major;
+    int minor;
+    int patch;
+  } version;
+
+  /// If true, this file was generated by an Impala internal release
+  bool is_impala_internal;
+
+  ParquetFileVersion() : is_impala_internal(false) { }
+
+  /// Parses the version from the created_by string
+  ParquetFileVersion(const std::string& created_by);
+
+  /// Returns true if version is strictly less than <major>.<minor>.<patch>
+  bool VersionLt(int major, int minor = 0, int patch = 0) const;
+
+  /// Returns true if version is equal to <major>.<minor>.<patch>
+  bool VersionEq(int major, int minor, int patch) const;
+};
+
+/// Internal representation of a Parquet schema (including nested-type columns).
+struct SchemaNode {
+  /// The corresponding schema element defined in the file metadata
+  const parquet::SchemaElement* element;
+
+  /// The index into the RowGroup::columns list if this column is materialized in the
+  /// file (i.e. it's a scalar type). -1 for nested types.
+  int col_idx;
+
+  /// The maximum definition level of this column, i.e., the definition level that
+  /// corresponds to a non-NULL value. Valid values are >= 0.
+  int max_def_level;
+
+  /// The maximum repetition level of this column. Valid values are >= 0.
+  int max_rep_level;
+
+  /// The definition level of the most immediate ancestor of this node with repeated
+  /// field repetition type. 0 if there are no repeated ancestors.
+  int def_level_of_immediate_repeated_ancestor;
+
+  /// Any nested schema nodes. Empty for non-nested types.
+  std::vector<SchemaNode> children;
+
+  SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
+                 def_level_of_immediate_repeated_ancestor(-1) { }
+
+  std::string DebugString(int indent = 0) const;
+
+  bool is_repeated() const {
+    return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
+  }
+};
+
+/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a
+/// Parquet file schema. Supports resolution by field index or by field name.
+class ParquetSchemaResolver {
+ public:
+  ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc,
+      TParquetFallbackSchemaResolution::type fallback_schema_resolution)
+    : tbl_desc_(tbl_desc),
+      fallback_schema_resolution_(fallback_schema_resolution),
+      filename_(NULL) {
+  }
+
+  /// Parses the schema of the given file metadata into an internal schema
+  /// representation used in path resolution. Remembers the filename for error
+  /// reporting. Returns a non-OK status if the Parquet schema could not be parsed.
+  Status Init(const parquet::FileMetaData* file_metadata, const char* filename) {
+    DCHECK(filename != NULL);
+    filename_ = filename;
+    return CreateSchemaTree(file_metadata->schema, &schema_);
+  }
+
+  /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
+  /// does not exist in this file's schema, 'missing_field' is set to true and
+  /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
+  /// resolves to a collection position field, *pos_field is set to true. Otherwise
+  /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
+  /// against the file's schema (e.g., unrecognized collection schema).
+  ///
+  /// Tries to resolve assuming either two- or three-level array encoding in
+  /// 'schema_'. Returns a bad status if resolution fails in both cases.
+  Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
+      bool* missing_field) const;
+
+ private:
+  /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
+  /// SchemaNode representation. Returns the result in 'node' unless an error status is
+  /// returned. Does not set the slot_desc field of any SchemaNode.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      SchemaNode* node) const;
+
+  /// Recursive implementation used internally by the above CreateSchemaTree() function.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
+      SchemaNode* node) const;
+
+  /// The 'array_encoding' parameter determines whether to assume one-, two-, or
+  /// three-level array encoding. The returned status is not logged (i.e. it's an expected
+  /// error).
+  enum ArrayEncoding {
+    ONE_LEVEL,
+    TWO_LEVEL,
+    THREE_LEVEL
+  };
+
+  Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
+      SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// Helper functions for ResolvePathHelper().
+
+  /// Advances 'node' to one of its children based on path[next_idx] and
+  /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
+  /// associated with 'node'. Returns the child node or sets 'missing_field' to true.
+  SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
+      int next_idx, SchemaNode* node, bool* missing_field) const;
+
+  /// Returns the index of 'node's child with 'name', or the number of children if not
+  /// found.
+  int FindChildWithName(SchemaNode* node, const string& name) const;
+
+  /// The ResolvePathHelper() logic for arrays.
+  Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
+    SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for maps.
+  Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
+      bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for scalars (just does validation since there's no
+  /// more actual work to be done).
+  Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
+      const SchemaPath& path, int idx) const;
+
+  const HdfsTableDescriptor& tbl_desc_;
+  const TParquetFallbackSchemaResolution::type fallback_schema_resolution_;
+  const char* filename_;
+
+  /// Root node of our internal schema representation populated in Init().
+  SchemaNode schema_;
+};
+
+} // impala namespace
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-scratch-tuple-batch.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-scratch-tuple-batch.h b/be/src/exec/parquet-scratch-tuple-batch.h
new file mode 100644
index 0000000..f2f9794
--- /dev/null
+++ b/be/src/exec/parquet-scratch-tuple-batch.h
@@ -0,0 +1,72 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+
+#include "runtime/descriptors.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+/// Helper struct that holds a batch of tuples allocated from a mem pool, as well
+/// as state associated with iterating over its tuples and transferring
+/// them to an output batch in TransferScratchTuples().
+struct ScratchTupleBatch {
+  // Memory backing the batch of tuples. Allocated from batch's tuple data pool.
+  uint8_t* tuple_mem;
+  // Keeps track of the current tuple index.
+  int tuple_idx;
+  // Number of valid tuples in tuple_mem.
+  int num_tuples;
+  // Cached for convenient access.
+  const int tuple_byte_size;
+
+  // Helper batch for safely allocating tuple_mem from its tuple data pool using
+  // ResizeAndAllocateTupleBuffer().
+  RowBatch batch;
+
+  ScratchTupleBatch(
+      const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
+    : tuple_mem(NULL),
+      tuple_idx(0),
+      num_tuples(0),
+      tuple_byte_size(row_desc.GetRowSize()),
+      batch(row_desc, batch_size, mem_tracker) {
+    DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
+  }
+
+  Status Reset(RuntimeState* state) {
+    tuple_idx = 0;
+    num_tuples = 0;
+    // Buffer size is not needed.
+    int64_t buffer_size;
+    RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem));
+    return Status::OK();
+  }
+
+  inline Tuple* GetTuple(int tuple_idx) const {
+    return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
+  }
+
+  inline MemPool* mem_pool() { return batch.tuple_data_pool(); }
+  inline int capacity() const { return batch.capacity(); }
+  inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
+  inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
+  inline bool AtEnd() const { return tuple_idx == num_tuples; }
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-version-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-version-test.cc b/be/src/exec/parquet-version-test.cc
index c159205..9ead7b6 100644
--- a/be/src/exec/parquet-version-test.cc
+++ b/be/src/exec/parquet-version-test.cc
@@ -17,7 +17,8 @@
 #include <iostream>
 #include <limits.h>
 #include <gtest/gtest.h>
-#include "exec/hdfs-parquet-scanner.h"
+#include "exec/parquet-metadata-utils.h"
+#include "util/cpu-info.h"
 
 #include "common/names.h"
 
@@ -26,7 +27,7 @@ namespace impala {
 void CheckVersionParse(const string& s, const string& expected_application,
     int expected_major, int expected_minor, int expected_patch,
     bool expected_is_internal) {
-  HdfsParquetScanner::FileVersion v(s);
+  ParquetFileVersion v(s);
   EXPECT_EQ(v.application, expected_application) << "String: " << s;
   EXPECT_EQ(v.version.major, expected_major) << "String: " << s;
   EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s;
@@ -62,7 +63,7 @@ TEST(ParquetVersionTest, Parsing) {
 }
 
 TEST(ParquetVersionTest, Comparisons) {
-  HdfsParquetScanner::FileVersion v("foo version 1.2.3");
+  ParquetFileVersion v("foo version 1.2.3");
   EXPECT_TRUE(v.VersionEq(1, 2, 3));
   EXPECT_FALSE(v.VersionEq(1, 2, 4));
   EXPECT_TRUE(v.VersionLt(3, 2, 1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exprs/expr-value.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-value.h b/be/src/exprs/expr-value.h
index 93b5f83..2cd2d58 100644
--- a/be/src/exprs/expr-value.h
+++ b/be/src/exprs/expr-value.h
@@ -17,7 +17,7 @@
 
 #include "runtime/collection-value.h"
 #include "runtime/decimal-value.h"
-#include "runtime/string-value.h"
+#include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "util/decimal-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index afbe3b6..7a467be 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -225,6 +225,20 @@ void RuntimeState::GetUnreportedErrors(ErrorLogMap* new_errors) {
   ClearErrorMap(error_log_);
 }
 
+Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
+  DCHECK_NE(message.error(), TErrorCode::OK);
+  // If either abort_on_error=true or the error necessitates execution stops
+  // immediately, return an error status.
+  if (abort_on_error() ||
+      message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
+      message.error() == TErrorCode::CANCELLED) {
+    return Status(message);
+  }
+  // Otherwise, add the error to the error log and continue.
+  LogError(message);
+  return Status::OK();
+}
+
 void RuntimeState::LogMemLimitExceeded(const MemTracker* tracker,
     int64_t failed_allocation_size) {
   DCHECK_GE(failed_allocation_size, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index a5e560e..d6c766c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -209,6 +209,12 @@ class RuntimeState {
   /// be sent back to the coordinator
   void GetUnreportedErrors(ErrorLogMap* new_errors);
 
+  /// Given an error message, determine whether execution should be aborted and, if so,
+  /// return the corresponding error status. Otherwise, log the error and return
+  /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to
+  /// true or the error is not recoverable and should be handled upstream.
+  Status LogOrReturnError(const ErrorMsg& message);
+
   bool is_cancelled() const { return is_cancelled_; }
   void set_is_cancelled(bool v) { is_cancelled_ = v; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index c3f8c94..5ac96e6 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -269,6 +269,14 @@ string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path) {
   return ss.str();
 }
 
+string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path,
+    int end_path_idx) {
+  DCHECK_GE(end_path_idx, 0);
+  SchemaPath::const_iterator subpath_end = path.begin() + end_path_idx + 1;
+  SchemaPath subpath(path.begin(), subpath_end);
+  return PrintPath(tbl_desc, subpath);
+}
+
 string PrintNumericPath(const SchemaPath& path) {
   stringstream ss;
   ss << "[";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index c9550dc..48e8c27 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -70,8 +70,12 @@ std::string PrintAsHex(const char* bytes, int64_t len);
 std::string PrintTMetricKind(const TMetricKind::type& type);
 std::string PrintTUnit(const TUnit::type& type);
 std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type);
+
 /// Returns the fully qualified path, e.g. "database.table.array_col.item.field"
 std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path);
+/// Same as PrintPath(), but truncates the path after the given 'end_path_idx'.
+std::string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path,
+    int end_path_idx);
 /// Returns the numeric path without column/field names, e.g. "[0,1,2]"
 std::string PrintNumericPath(const SchemaPath& path);
 
@@ -98,6 +102,20 @@ std::string GetVersionString(bool compact = false);
 /// for recursive calls.
 std::string GetStackTrace();
 
+// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
+// input file. They differentiate these cases from DCHECKs, which indicate conditions that
+// are true unless there's a bug in Impala. We would ideally always return a bad Status
+// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK instead because
+// there's a performance cost to doing the check in a release build, or just due to legacy
+// code.
+#define FILE_CHECK(a) DCHECK(a)
+#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b)
+#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b)
+#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b)
+#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b)
+#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b)
+#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b)
+
 }
 
 #endif