You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/27 02:01:52 UTC

[02/14] impala git commit: IMPALA-7869: break up parquet-column-readers.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-level-decoder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-level-decoder.h b/be/src/exec/parquet/parquet-level-decoder.h
new file mode 100644
index 0000000..2e0c24e
--- /dev/null
+++ b/be/src/exec/parquet/parquet-level-decoder.h
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+#include "gen-cpp/parquet_types.h"
+#include "util/rle-encoding.h"
+
+namespace impala {
+
+class MemPool;
+
+/// Constants used instead of actual levels to indicate special conditions.
+class ParquetLevel {
+ public:
+  /// The rep and def levels are set to this value to indicate the end of a row group.
+  static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
+  /// Indicates an invalid definition or repetition level.
+  static const int16_t INVALID_LEVEL = -1;
+  /// Indicates an invalid position value.
+  static const int16_t INVALID_POS = -1;
+};
+
+/// Decoder for encoded Parquet levels. Only supports the RLE encoding, not the deprecated
+/// BIT_PACKED encoding. Optionally reads, decodes, and caches level values in batches.
+/// Level values are unsigned 8-bit integers because we support a maximum nesting
+/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
+/// populating the level cache (e.g., with RLE we can memset() repeated values).
+class ParquetLevelDecoder {
+ public:
+  ParquetLevelDecoder(bool is_def_level_decoder)
+    : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR :
+                                                  TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
+
+  /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
+  /// encoding requires reading metadata from the page header. 'cache_size' will be
+  /// rounded up to a multiple of 32 internally.
+  Status Init(const string& filename, parquet::Encoding::type encoding,
+      MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
+
+  /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
+  /// as batched methods.
+  inline int16_t ReadLevel();
+
+  /// If the next value is part of a repeated run and is not cached, return the length
+  /// of the repeated run. A max level of 0 is treated as an arbitrarily long run of
+  /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0.
+  inline int32_t NextRepeatedRunLength();
+
+  /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume
+  /// 'num_to_consume' items in the run. Not valid to call if there are cached levels
+  /// that have not been consumed.
+  inline uint8_t GetRepeatedValue(uint32_t num_to_consume);
+
+  /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
+  /// values left to decode in the page. Resets members associated with the cache.
+  /// Returns a non-ok status if there was a problem decoding a level, if a level was
+  /// encountered with a value greater than max_level_, or if fewer than
+  /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the
+  /// input did not have the expected number of values. Only valid to call when
+  /// the cache has been exhausted, i.e. CacheHasNext() is false.
+  Status CacheNextBatch(int vals_remaining);
+
+  /// Functions for working with the level cache.
+  bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
+  uint8_t CacheGetNext() {
+    DCHECK_LT(cached_level_idx_, num_cached_levels_);
+    return cached_levels_[cached_level_idx_++];
+  }
+  void CacheSkipLevels(int num_levels) {
+    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
+    cached_level_idx_ += num_levels;
+  }
+  int CacheSize() const { return num_cached_levels_; }
+  int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
+  int CacheCurrIdx() const { return cached_level_idx_; }
+
+ private:
+  /// Initializes members associated with the level cache. Allocates memory for
+  /// the cache from pool, if necessary.
+  Status InitCache(MemPool* pool, int cache_size);
+
+  /// Decodes and writes a batch of levels into the cache. Returns true and sets
+  /// the number of values written to the cache via *num_cached_levels if no errors
+  /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the
+  /// end of input was hit without any other errors. Returns false if there was an
+  /// error decoding a level or if there was an invalid level value greater than
+  /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
+  /// CacheHasNext() is false.
+  bool FillCache(int batch_size, int* num_cached_levels);
+
+  /// RLE decoder, used if max_level_ > 0.
+  RleBatchDecoder<uint8_t> rle_decoder_;
+
+  /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
+  /// in Init().
+  uint8_t* cached_levels_ = nullptr;
+
+  /// Number of valid level values in the cache.
+  int num_cached_levels_ = 0;
+
+  /// Current index into cached_levels_.
+  int cached_level_idx_ = 0;
+
+  /// For error checking and reporting.
+  int max_level_ = 0;
+
+  /// Number of level values cached_levels_ has memory allocated for. Always
+  /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
+  int cache_size_ = 0;
+
+  /// Name of the parquet file. Used for reporting level decoding errors.
+  string filename_;
+
+  /// Error code to use when reporting level decoding errors.
+  TErrorCode::type decoding_error_code_;
+};
+
+inline int16_t ParquetLevelDecoder::ReadLevel() {
+  if (UNLIKELY(!CacheHasNext())) {
+    if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
+      return ParquetLevel::INVALID_LEVEL;
+    }
+    DCHECK_GE(num_cached_levels_, 0);
+    if (UNLIKELY(num_cached_levels_ == 0)) {
+      return ParquetLevel::INVALID_LEVEL;
+    }
+  }
+  return CacheGetNext();
+}
+
+inline int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
+  if (CacheHasNext()) return 0;
+  // Treat always-zero levels as an infinitely long run of zeroes. Return the maximum
+  // run length allowed by the Parquet standard.
+  if (max_level_ == 0) return numeric_limits<int32_t>::max();
+  return rle_decoder_.NextNumRepeats();
+}
+
+inline uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
+  DCHECK(!CacheHasNext());
+  // Treat always-zero levels as an infinitely long run of zeroes.
+  if (max_level_ == 0) return 0;
+  return rle_decoder_.GetRepeatedValue(num_to_consume);
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
new file mode 100644
index 0000000..7cbfeda
--- /dev/null
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -0,0 +1,733 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-metadata-utils.h"
+
+#include <strings.h>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "exec/parquet/parquet-column-stats.h"
+#include "exec/parquet/parquet-common.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+
+#include "common/names.h"
+
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+
+namespace impala {
+
+namespace {
+
+const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = {
+    {PrimitiveType::INVALID_TYPE, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_NULL, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_BOOLEAN, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT32, parquet::Type::INT64}},
+    {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
+    {PrimitiveType::TYPE_DOUBLE, {parquet::Type::INT32, parquet::Type::FLOAT,
+        parquet::Type::DOUBLE}},
+    {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
+    {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATETIME, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_BINARY, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DECIMAL, {parquet::Type::INT32, parquet::Type::INT64,
+        parquet::Type::FIXED_LEN_BYTE_ARRAY, parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_CHAR, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}},
+};
+
+/// Physical types that are only supported with specific converted types.
+const map<PrimitiveType, set<pair<parquet::Type::type, parquet::ConvertedType::type>>>
+    SUPPORTED_CONVERTED_TYPES = {
+    {PrimitiveType::TYPE_TIMESTAMP,
+        {{parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MICROS},
+         {parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MILLIS}}}};
+};
+
+/// Returns true if 'parquet_type' is a supported physical encoding for the Impala
+/// primitive type, false otherwise. Some physical types are accepted only for certain
+/// converted types.
+bool IsSupportedType(PrimitiveType impala_type,
+    const parquet::SchemaElement& element) {
+  auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type);
+  DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end());
+  parquet::Type::type parquet_type = element.type;
+  if (encodings->second.find(parquet_type) != encodings->second.end()) return true;
+
+  if(!element.__isset.converted_type) return false;
+  parquet::ConvertedType::type converted_type = element.converted_type;
+  auto converted_types = SUPPORTED_CONVERTED_TYPES.find(impala_type);
+  if (converted_types == SUPPORTED_CONVERTED_TYPES.end()) return false;
+  if (converted_types->second.find({parquet_type, converted_type})
+      != converted_types->second.end()) return true;
+
+  return false;
+}
+
+// Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
+const std::vector<ParquetSchemaResolver::ArrayEncoding>
+    ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
+        {{ParquetSchemaResolver::THREE_LEVEL, ParquetSchemaResolver::ONE_LEVEL},
+         {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::ONE_LEVEL},
+         {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::THREE_LEVEL,
+             ParquetSchemaResolver::ONE_LEVEL}};
+
+Status ParquetMetadataUtils::ValidateFileVersion(
+    const parquet::FileMetaData& file_metadata, const char* filename) {
+  if (file_metadata.version > PARQUET_CURRENT_VERSION) {
+    stringstream ss;
+    ss << "File: " << filename << " is of an unsupported version. "
+       << "file version: " << file_metadata.version;
+    return Status(ss.str());
+  }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
+    int64_t file_length, const parquet::RowGroup& row_group) {
+  for (int i = 0; i < row_group.columns.size(); ++i) {
+    const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+    RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+        col_chunk.meta_data.data_page_offset, "data page offset"));
+    int64_t col_start = col_chunk.meta_data.data_page_offset;
+    // The file format requires that if a dictionary page exists, it be before data pages.
+    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+            col_chunk.meta_data.dictionary_page_offset, "dictionary page offset"));
+      if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
+        return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary "
+            "page (offset=$1) must come before any data pages (offset=$2).",
+            filename, col_chunk.meta_data.dictionary_page_offset, col_start));
+      }
+      col_start = col_chunk.meta_data.dictionary_page_offset;
+    }
+    int64_t col_len = col_chunk.meta_data.total_compressed_size;
+    int64_t col_end = col_start + col_len;
+    if (col_end <= 0 || col_end > file_length) {
+      return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has "
+          "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i,
+          col_start, col_len, file_length));
+    }
+  }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx,
+    int64_t file_length, int64_t offset, const string& offset_name) {
+  if (offset < 0 || offset >= file_length) {
+    return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid "
+        "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset,
+        file_length));
+  }
+  return Status::OK();;
+}
+
+static bool IsEncodingSupported(parquet::Encoding::type e) {
+  switch (e) {
+    case parquet::Encoding::PLAIN:
+    case parquet::Encoding::PLAIN_DICTIONARY:
+    case parquet::Encoding::BIT_PACKED:
+    case parquet::Encoding::RLE:
+      return true;
+    default:
+      return false;
+  }
+}
+
+Status ParquetMetadataUtils::ValidateRowGroupColumn(
+    const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx,
+    int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) {
+  const parquet::ColumnMetaData& col_chunk_metadata =
+      file_metadata.row_groups[row_group_idx].columns[col_idx].meta_data;
+
+  // Check the encodings are supported.
+  const vector<parquet::Encoding::type>& encodings = col_chunk_metadata.encodings;
+  for (int i = 0; i < encodings.size(); ++i) {
+    if (!IsEncodingSupported(encodings[i])) {
+      return Status(Substitute("File '$0' uses an unsupported encoding: $1 for column "
+          "'$2'.", filename, PrintThriftEnum(encodings[i]), schema_element.name));
+    }
+  }
+
+  // Check the compression is supported.
+  if (col_chunk_metadata.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::SNAPPY &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::GZIP) {
+    return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
+        "'$2'.", filename, col_chunk_metadata.codec, schema_element.name));
+  }
+
+  if (col_chunk_metadata.type != schema_element.type) {
+    return Status(Substitute("Mismatched column chunk Parquet type in file '$0' column "
+            "'$1'. Expected $2 actual $3: file may be corrupt", filename,
+            schema_element.name, col_chunk_metadata.type, schema_element.type));
+  }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const char* filename,
+    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+    RuntimeState* state) {
+  // Following validation logic is only for non-complex types.
+  if (slot_desc->type().IsComplexType()) return Status::OK();
+
+  if (UNLIKELY(!IsSupportedType(slot_desc->type().type, schema_element))) {
+    return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical "
+        "type: $1, physical type: $2. File may be corrupt.",
+        filename, slot_desc->type().type, schema_element.type));
+    }
+
+  // Check the decimal scale in the file matches the metastore scale and precision.
+  // We fail the query if the metadata makes it impossible for us to safely read
+  // the file. If we don't require the metadata, we will fail the query if
+  // abort_on_error is true, otherwise we will just log a warning.
+  bool is_converted_type_decimal = schema_element.__isset.converted_type
+      && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+  if (slot_desc->type().type == TYPE_DECIMAL) {
+    // TODO: allow converting to wider type (IMPALA-2515)
+    if (schema_element.type == parquet::Type::INT32 &&
+        sizeof(int32_t) != slot_desc->type().GetByteSize()) {
+      return Status(Substitute("File '$0' decimal column '$1' is stored as INT32, but "
+          "based on the precision in the table metadata, another type would needed.",
+          filename, schema_element.name));
+    }
+    if (schema_element.type == parquet::Type::INT64 &&
+        sizeof(int64_t) != slot_desc->type().GetByteSize()) {
+      return Status(Substitute("File '$0' decimal column '$1' is stored as INT64, but "
+          "based on the precision in the table metadata, another type would needed.",
+          filename, schema_element.name));
+    }
+    // We require that the scale and byte length be set.
+    if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      if (!schema_element.__isset.type_length) {
+        return Status(Substitute("File '$0' column '$1' does not have type_length set.",
+            filename, schema_element.name));
+      }
+
+      int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+      if (schema_element.type_length != expected_len) {
+        return Status(Substitute("File '$0' column '$1' has an invalid type length. "
+            "Expecting: $2 len in file: $3", filename, schema_element.name, expected_len,
+            schema_element.type_length));
+      }
+    }
+    if (!schema_element.__isset.scale) {
+      return Status(Substitute("File '$0' column '$1' does not have the scale set.",
+          filename, schema_element.name));
+    }
+
+    if (schema_element.scale != slot_desc->type().scale) {
+      // TODO: we could allow a mismatch and do a conversion at this step.
+      return Status(Substitute("File '$0' column '$1' has a scale that does not match "
+          "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
+          filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
+    }
+
+    // The other decimal metadata should be there but we don't need it.
+    if (!schema_element.__isset.precision) {
+      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    } else {
+      if (schema_element.precision != slot_desc->type().precision) {
+        // TODO: we could allow a mismatch and do a conversion at this step.
+        ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
+            schema_element.precision, slot_desc->type().precision);
+        RETURN_IF_ERROR(state->LogOrReturnError(msg));
+      }
+    }
+
+    if (!is_converted_type_decimal) {
+      // TODO: is this validation useful? It is not required at all to read the data and
+      // might only serve to reject otherwise perfectly readable files.
+      ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename,
+          schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    }
+  } else if (schema_element.__isset.scale || schema_element.__isset.precision
+      || is_converted_type_decimal) {
+    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, schema_element.name,
+        slot_desc->type().DebugString());
+    RETURN_IF_ERROR(state->LogOrReturnError(msg));
+  }
+  return Status::OK();
+}
+
+ParquetFileVersion::ParquetFileVersion(const string& created_by) {
+  string created_by_lower = created_by;
+  std::transform(created_by_lower.begin(), created_by_lower.end(),
+      created_by_lower.begin(), ::tolower);
+  is_impala_internal = false;
+
+  vector<string> tokens;
+  split(tokens, created_by_lower, is_any_of(" "), token_compress_on);
+  // Boost always creates at least one token
+  DCHECK_GT(tokens.size(), 0);
+  application = tokens[0];
+
+  if (tokens.size() >= 3 && tokens[1] == "version") {
+    string version_string = tokens[2];
+    // Ignore any trailing nodextra characters
+    int n = version_string.find_first_not_of("0123456789.");
+    string version_string_trimmed = version_string.substr(0, n);
+
+    vector<string> version_tokens;
+    split(version_tokens, version_string_trimmed, is_any_of("."));
+    version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
+    version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
+    version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
+
+    if (application == "impala") {
+      if (version_string.find("-internal") != string::npos) is_impala_internal = true;
+    }
+  } else {
+    version.major = 0;
+    version.minor = 0;
+    version.patch = 0;
+  }
+}
+
+bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const {
+  if (version.major < major) return true;
+  if (version.major > major) return false;
+  DCHECK_EQ(version.major, major);
+  if (version.minor < minor) return true;
+  if (version.minor > minor) return false;
+  DCHECK_EQ(version.minor, minor);
+  return version.patch < patch;
+}
+
+bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const {
+  return version.major == major && version.minor == minor && version.patch == patch;
+}
+
+static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) {
+  switch (t) {
+    case parquet::FieldRepetitionType::REQUIRED: return "required";
+    case parquet::FieldRepetitionType::OPTIONAL: return "optional";
+    case parquet::FieldRepetitionType::REPEATED: return "repeated";
+    default: return "<unknown>";
+  }
+}
+
+static string PrintParquetType(const parquet::Type::type& t) {
+  switch (t) {
+    case parquet::Type::BOOLEAN: return "boolean";
+    case parquet::Type::INT32: return "int32";
+    case parquet::Type::INT64: return "int64";
+    case parquet::Type::INT96: return "int96";
+    case parquet::Type::FLOAT: return "float";
+    case parquet::Type::DOUBLE: return "double";
+    case parquet::Type::BYTE_ARRAY: return "byte_array";
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array";
+    default: return "<unknown>";
+  }
+}
+
+string SchemaNode::DebugString(int indent) const {
+  stringstream ss;
+  for (int i = 0; i < indent; ++i) ss << " ";
+  ss << PrintRepetitionType(element->repetition_type) << " ";
+  if (element->num_children > 0) {
+    ss << "struct";
+  } else {
+    ss << PrintParquetType(element->type);
+  }
+  ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level
+     << " r:" << max_rep_level << "]";
+  if (element->num_children > 0) {
+    ss << " {" << endl;
+    for (int i = 0; i < element->num_children; ++i) {
+      ss << children[i].DebugString(indent + 2) << endl;
+    }
+    for (int i = 0; i < indent; ++i) ss << " ";
+    ss << "}";
+  }
+  return ss.str();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+    const vector<parquet::SchemaElement>& schema, SchemaNode* node) const {
+  int idx = 0;
+  int col_idx = 0;
+  RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node));
+  if (node->children.empty()) {
+    return Status(Substitute("Invalid file: '$0' has no columns.", filename_));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+    const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level,
+    int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
+    const {
+  if (*idx >= schema.size()) {
+    return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from "
+            "flattened schema in file metadata", filename_));
+  }
+  bool is_root_schema = (*idx == 0);
+  node->element = &schema[*idx];
+  ++(*idx);
+
+  if (node->element->num_children == 0) {
+    // node is a leaf node, meaning it's materialized in the file and appears in
+    // file_metadata_.row_groups.columns
+    node->col_idx = *col_idx;
+    ++(*col_idx);
+  } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
+    // Sanity-check the schema to avoid allocating absurdly large buffers below.
+    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than "
+        "limit of $2. File is likely corrupt", filename_, node->element->num_children,
+        SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
+  } else if (node->element->num_children < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
+        filename_, node->element->num_children));
+  }
+
+  // def_level_of_immediate_repeated_ancestor does not include this node, so set before
+  // updating ira_def_level
+  node->def_level_of_immediate_repeated_ancestor = ira_def_level;
+
+  if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
+    ++max_def_level;
+  } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED &&
+             !is_root_schema /*PARQUET-843*/) {
+    ++max_rep_level;
+    // Repeated fields add a definition level. This is used to distinguish between an
+    // empty list and a list with an item in it.
+    ++max_def_level;
+    // node is the new most immediate repeated ancestor
+    ira_def_level = max_def_level;
+  }
+  node->max_def_level = max_def_level;
+  node->max_rep_level = max_rep_level;
+
+  node->children.resize(node->element->num_children);
+  for (int i = 0; i < node->element->num_children; ++i) {
+    RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level,
+        idx, col_idx, &node->children[i]));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node,
+    bool* pos_field, bool* missing_field) const {
+  *missing_field = false;
+  const vector<ArrayEncoding>& ordered_array_encodings =
+      ORDERED_ARRAY_ENCODINGS[array_resolution_];
+
+  bool any_missing_field = false;
+  Status statuses[NUM_ARRAY_ENCODINGS];
+  for (const auto& array_encoding: ordered_array_encodings) {
+    bool current_missing_field;
+    statuses[array_encoding] = ResolvePathHelper(
+        array_encoding, path, node, pos_field, &current_missing_field);
+    if (current_missing_field) DCHECK(statuses[array_encoding].ok());
+    if (statuses[array_encoding].ok() && !current_missing_field) return Status::OK();
+    any_missing_field = any_missing_field || current_missing_field;
+  }
+  // None of resolutions yielded a node. Set *missing_field to true if any of the
+  // resolutions reported a missing a field.
+  if (any_missing_field) {
+    *node = NULL;
+    *missing_field = true;
+    return Status::OK();
+  }
+
+  // All resolutions failed. Log and return the most relevant status. The three-level
+  // encoding is the Parquet standard, so always prefer that. Prefer the two-level over
+  // the one-level because the two-level can be specifically selected via a query option.
+  Status error_status = Status::OK();
+  for (int i = THREE_LEVEL; i >= ONE_LEVEL; --i) {
+    if (!statuses[i].ok()) {
+      error_status = statuses[i];
+      break;
+    }
+  }
+  DCHECK(!error_status.ok());
+  *node = NULL;
+  VLOG_QUERY << error_status.msg().msg() << "\n" << GetStackTrace();
+  return error_status;
+}
+
+Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding,
+    const SchemaPath& path, SchemaNode** node, bool* pos_field,
+    bool* missing_field) const {
+  DCHECK(schema_.element != NULL)
+      << "schema_ must be initialized before calling ResolvePath()";
+
+  *pos_field = false;
+  *missing_field = false;
+  *node = const_cast<SchemaNode*>(&schema_);
+  const ColumnType* col_type = NULL;
+
+  // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by
+  // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema.
+  for (int i = 0; i < path.size(); ++i) {
+    // Advance '*node' if necessary
+    if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
+      *node = NextSchemaNode(col_type, path, i, *node, missing_field);
+      if (*missing_field) return Status::OK();
+    } else {
+      // We just resolved an array, meaning *node is set to the repeated field of the
+      // array. Since we are trying to resolve using one- or two-level array encoding, the
+      // repeated field represents both the array and the array's item (i.e. there is no
+      // explict item field), so we don't advance *node in this case.
+      DCHECK(col_type != NULL);
+      DCHECK_EQ(col_type->type, TYPE_ARRAY);
+      DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
+      DCHECK((*node)->is_repeated());
+    }
+
+    // Advance 'col_type'
+    int table_idx = path[i];
+    col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type()
+               : &col_type->children[table_idx];
+
+    // Resolve path[i]
+    if (col_type->type == TYPE_ARRAY) {
+      DCHECK_EQ(col_type->children.size(), 1);
+      RETURN_IF_ERROR(
+          ResolveArray(array_encoding, path, i, node, pos_field, missing_field));
+      if (*missing_field || *pos_field) return Status::OK();
+    } else if (col_type->type == TYPE_MAP) {
+      DCHECK_EQ(col_type->children.size(), 2);
+      RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
+      if (*missing_field) return Status::OK();
+    } else if (col_type->type == TYPE_STRUCT) {
+      DCHECK_GT(col_type->children.size(), 0);
+      // Nothing to do for structs
+    } else {
+      DCHECK(!col_type->IsComplexType());
+      DCHECK_EQ(i, path.size() - 1);
+      RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
+    }
+  }
+  DCHECK(*node != NULL);
+  return Status::OK();
+}
+
+SchemaNode* ParquetSchemaResolver::NextSchemaNode(
+    const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
+    bool* missing_field) const {
+  DCHECK_LT(next_idx, path.size());
+  if (next_idx != 0) DCHECK(col_type != NULL);
+
+  int file_idx;
+  int table_idx = path[next_idx];
+  if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) {
+    if (next_idx == 0) {
+      // Resolve top-level table column by name.
+      DCHECK_LT(table_idx, tbl_desc_.col_descs().size());
+      const string& name = tbl_desc_.col_descs()[table_idx].name();
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_STRUCT) {
+      // Resolve struct field by name.
+      DCHECK_LT(table_idx, col_type->field_names.size());
+      const string& name = col_type->field_names[table_idx];
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_ARRAY) {
+      // Arrays have only one child in the file.
+      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+      file_idx = table_idx;
+    } else {
+      DCHECK_EQ(col_type->type, TYPE_MAP);
+      // Maps have two values, "key" and "value". These are supposed to be ordered and may
+      // not have the right field names, but try to resolve by name in case they're
+      // switched and otherwise use the order. See
+      // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+      // more details.
+      DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
+             table_idx == SchemaPathConstants::MAP_VALUE);
+      const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
+      file_idx = FindChildWithName(node, name);
+      if (file_idx >= node->children.size()) {
+        // Couldn't resolve by name, fall back to resolution by position.
+        file_idx = table_idx;
+      }
+    }
+  } else {
+    // Resolution by position.
+    DCHECK_EQ(fallback_schema_resolution_,
+        TParquetFallbackSchemaResolution::type::POSITION);
+    if (next_idx == 0) {
+      // For top-level columns, the first index in a path includes the table's partition
+      // keys.
+      file_idx = table_idx - tbl_desc_.num_clustering_cols();
+    } else {
+      file_idx = table_idx;
+    }
+  }
+
+  if (file_idx >= node->children.size()) {
+    string schema_resolution_mode = "unknown";
+    auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find(
+        fallback_schema_resolution_);
+    if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) {
+      schema_resolution_mode = entry->second;
+    }
+    VLOG_FILE << Substitute(
+        "File '$0' does not contain path '$1' (resolving by $2)", filename_,
+        PrintPath(tbl_desc_, path), schema_resolution_mode);
+    *missing_field = true;
+    return NULL;
+  }
+  return &node->children[file_idx];
+}
+
+int ParquetSchemaResolver::FindChildWithName(SchemaNode* node,
+    const string& name) const {
+  int idx;
+  for (idx = 0; idx < node->children.size(); ++idx) {
+    if (strcasecmp(node->children[idx].element->name.c_str(), name.c_str()) == 0) break;
+  }
+  return idx;
+}
+
+// There are three types of array encodings:
+//
+// 1. One-level encoding
+//      A bare repeated field. This is interpreted as a required array of required
+//      items.
+//    Example:
+//      repeated <item-type> item;
+//
+// 2. Two-level encoding
+//      A group containing a single repeated field. This is interpreted as a
+//      <list-repetition> array of required items (<list-repetition> is either
+//      optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated <item-type> item;
+//      }
+//
+// 3. Three-level encoding
+//      The "official" encoding according to the parquet spec. A group containing a
+//      single repeated group containing the item field. This is interpreted as a
+//      <list-repetition> array of <item-repetition> items (<list-repetition> and
+//      <item-repetition> are each either optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated group list {
+//          <item-repetition> <item-type> item;
+//        }
+//      }
+//
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a
+// group containing more fields, which corresponds to a complex item type. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for
+// more details and examples.
+//
+// This function resolves the array at '*node' assuming one-, two-, or three-level
+// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all
+// three encodings (unless '*pos_field' or '*missing_field' are set to true).
+Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding,
+    const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
+    bool* missing_field) const {
+  if (array_encoding == ONE_LEVEL) {
+    if (!(*node)->is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+  } else {
+    // In the multi-level case, we always expect the outer group to contain a single
+    // repeated field
+    if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+    // Set *node to the repeated field
+    *node = &(*node)->children[0];
+  }
+  DCHECK((*node)->is_repeated());
+
+  if (idx + 1 < path.size()) {
+    if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
+      // The next index in 'path' is the artifical position field.
+      DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!";
+      *pos_field = true;
+      *node = NULL;
+      return Status::OK();
+    } else {
+      // The next value in 'path' should be the item index
+      DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM);
+    }
+  }
+  return Status::OK();
+}
+
+// According to the parquet spec, map columns are represented like:
+// <map-repetition> group <name> (MAP) {
+//   repeated group key_value {
+//     required <key-type> key;
+//     <value-repetition> <value-type> value;
+//   }
+// }
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+// more details.
+Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx,
+    SchemaNode** node, bool* missing_field) const {
+  if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() ||
+      (*node)->children[0].children.size() != 2) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString());
+    return Status::Expected(msg);
+  }
+  *node = &(*node)->children[0];
+
+  // The next index in 'path' should be the key or the value.
+  if (idx + 1 < path.size()) {
+    DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY ||
+           path[idx + 1] == SchemaPathConstants::MAP_VALUE);
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
+    const ColumnType& col_type, const SchemaPath& path, int idx) const {
+  if (!node.children.empty()) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  if (!IsSupportedType(col_type.type, *node.element)) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
new file mode 100644
index 0000000..f3a144d
--- /dev/null
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+
+#include <string>
+
+#include "runtime/descriptors.h"
+#include "gen-cpp/parquet_types.h"
+
+namespace impala {
+
+class RuntimeState;
+
+class ParquetMetadataUtils {
+ public:
+  /// Checks the version of the given file and returns a non-OK status if
+  /// Impala does not support that version.
+  static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata,
+      const char* filename);
+
+  /// Validate column offsets by checking if the dictionary page comes before the data
+  /// pages and checking if the column offsets lie within the file.
+  static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
+      const parquet::RowGroup& row_group);
+
+  /// Check that a file offset is in the file. Return an error status with a detailed
+  /// error message if it is not.
+  static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
+      int64_t file_length, int64_t offset, const std::string& offset_name);
+
+  /// Validates the column metadata inside a row group to make sure this column is
+  /// supported (e.g. encoding, type, etc).
+  static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata,
+      const char* filename, int row_group_idx, int col_idx,
+      const parquet::SchemaElement& schema_element, RuntimeState* state);
+
+  /// Validates the column metadata to make sure the column is supported and its type
+  /// attributes conform to the parquet spec.
+  static Status ValidateColumn(const char* filename,
+      const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+      RuntimeState* state);
+};
+
+struct ParquetFileVersion {
+  /// Application that wrote the file. e.g. "IMPALA"
+  std::string application;
+
+  /// Version of the application that wrote the file, expressed in three parts
+  /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are
+  /// ignored. e.g.:
+  /// "1.2.3"    => {1, 2, 3}
+  /// "1.2"      => {1, 2, 0}
+  /// "1.2-cdh5" => {1, 2, 0}
+  struct {
+    int major;
+    int minor;
+    int patch;
+  } version;
+
+  /// If true, this file was generated by an Impala internal release
+  bool is_impala_internal;
+
+  ParquetFileVersion() : is_impala_internal(false) { }
+
+  /// Parses the version from the created_by string
+  ParquetFileVersion(const std::string& created_by);
+
+  /// Returns true if version is strictly less than <major>.<minor>.<patch>
+  bool VersionLt(int major, int minor = 0, int patch = 0) const;
+
+  /// Returns true if version is equal to <major>.<minor>.<patch>
+  bool VersionEq(int major, int minor, int patch) const;
+};
+
+/// Internal representation of a Parquet schema (including nested-type columns).
+struct SchemaNode {
+  /// The corresponding schema element defined in the file metadata
+  const parquet::SchemaElement* element;
+
+  /// The index into the RowGroup::columns list if this column is materialized in the
+  /// file (i.e. it's a scalar type). -1 for nested types.
+  int col_idx;
+
+  /// The maximum definition level of this column, i.e., the definition level that
+  /// corresponds to a non-NULL value. Valid values are >= 0.
+  int max_def_level;
+
+  /// The maximum repetition level of this column. Valid values are >= 0.
+  int max_rep_level;
+
+  /// The definition level of the most immediate ancestor of this node with repeated
+  /// field repetition type. 0 if there are no repeated ancestors.
+  int def_level_of_immediate_repeated_ancestor;
+
+  /// Any nested schema nodes. Empty for non-nested types.
+  std::vector<SchemaNode> children;
+
+  SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
+                 def_level_of_immediate_repeated_ancestor(-1) { }
+
+  std::string DebugString(int indent = 0) const;
+
+  bool is_repeated() const {
+    return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
+  }
+};
+
+/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a
+/// Parquet file schema. Supports resolution by field index or by field name.
+/// Supports different policies for resolving nested arrays based on the modern
+/// three-level encoding or the legacy encodings (one and two level).
+class ParquetSchemaResolver {
+ public:
+  ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc,
+      TParquetFallbackSchemaResolution::type fallback_schema_resolution,
+      TParquetArrayResolution::type array_resolution)
+    : tbl_desc_(tbl_desc),
+      fallback_schema_resolution_(fallback_schema_resolution),
+      array_resolution_(array_resolution),
+      filename_(NULL) {
+  }
+
+  /// Parses the schema of the given file metadata into an internal schema
+  /// representation used in path resolution. Remembers the filename for error
+  /// reporting. Returns a non-OK status if the Parquet schema could not be parsed.
+  Status Init(const parquet::FileMetaData* file_metadata, const char* filename) {
+    DCHECK(filename != NULL);
+    filename_ = filename;
+    return CreateSchemaTree(file_metadata->schema, &schema_);
+  }
+
+  /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
+  /// does not exist in this file's schema, 'missing_field' is set to true and
+  /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
+  /// resolves to a collection position field, *pos_field is set to true. Otherwise
+  /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
+  /// against the file's schema (e.g., unrecognized collection schema).
+  ///
+  /// Tries to resolve fields within lists according to the 'ordered_array_encodings_'.
+  /// Returns a bad status if resolution fails for all attempted array encodings.
+  Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
+      bool* missing_field) const;
+
+ private:
+  /// The 'array_encoding' parameter determines whether to assume one-, two-, or
+  /// three-level array encoding. The returned status is not logged (i.e. it's an expected
+  /// error).
+  enum ArrayEncoding {
+    ONE_LEVEL,
+    TWO_LEVEL,
+    THREE_LEVEL,
+    NUM_ARRAY_ENCODINGS
+  };
+
+  /// An arbitrary limit on the number of children per schema node we support.
+  /// Used to sanity-check Parquet schemas.
+  static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
+
+  /// Maps from the array-resolution policy to the ordered array encodings that should
+  /// be tried during path resolution. All entries have the ONE_LEVEL encoding at the end
+  /// because there is no ambiguity between the one-level and the other encodings (there
+  /// is no harm in trying it).
+  static const std::vector<ArrayEncoding> ORDERED_ARRAY_ENCODINGS[];
+
+  /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
+  /// SchemaNode representation. Returns the result in 'node' unless an error status is
+  /// returned. Does not set the slot_desc field of any SchemaNode.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      SchemaNode* node) const;
+
+  /// Recursive implementation used internally by the above CreateSchemaTree() function.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
+      SchemaNode* node) const;
+
+  Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
+      SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// Helper functions for ResolvePathHelper().
+
+  /// Advances 'node' to one of its children based on path[next_idx] and
+  /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
+  /// associated with 'node'. Returns the child node or sets 'missing_field' to true.
+  SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
+      int next_idx, SchemaNode* node, bool* missing_field) const;
+
+  /// Returns the index of 'node's child with 'name', or the number of children if not
+  /// found. The name comparison is case-insensitive because that's how Impala treats
+  /// db/table/column/field names. If there are several matches with different casing,
+  /// then the index of the first match is returned.
+  int FindChildWithName(SchemaNode* node, const string& name) const;
+
+  /// The ResolvePathHelper() logic for arrays.
+  Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
+    SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for maps.
+  Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
+      bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for scalars (just does validation since there's no
+  /// more actual work to be done).
+  Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
+      const SchemaPath& path, int idx) const;
+
+  const HdfsTableDescriptor& tbl_desc_;
+  const TParquetFallbackSchemaResolution::type fallback_schema_resolution_;
+  const TParquetArrayResolution::type array_resolution_;
+  const char* filename_;
+
+  /// Root node of our internal schema representation populated in Init().
+  SchemaNode schema_;
+};
+
+} // impala namespace
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-plain-test.cc b/be/src/exec/parquet/parquet-plain-test.cc
new file mode 100644
index 0000000..6eb880f
--- /dev/null
+++ b/be/src/exec/parquet/parquet-plain-test.cc
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+#include "exec/parquet/parquet-common.h"
+#include "runtime/decimal-value.h"
+#include "runtime/string-value.inline.h"
+#include "runtime/timestamp-value.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+template <typename InternalType>
+int Encode(const InternalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type physical_type){
+  return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+}
+
+// Handle special case of encoding decimal types stored as BYTE_ARRAY, INT32, and INT64,
+// since these are not implemented in Impala.
+// When parquet_type equals BYTE_ARRAY: 'encoded_byte_size' is the sum of the
+// minimum number of bytes required to store the unscaled value and the bytes required to
+// store the size. Value 'v' passed to it should not contain leading zeros as this
+// method does not strictly conform to the parquet spec in removing those.
+// When parquet_type is INT32 or INT64, we simply write the unscaled value to the buffer.
+template <typename DecimalType>
+int EncodeDecimal(const DecimalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  if (parquet_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+  } else if (parquet_type == parquet::Type::BYTE_ARRAY) {
+    int decimal_size = encoded_byte_size - sizeof(int32_t);
+    memcpy(buffer, &decimal_size, sizeof(int32_t));
+    DecimalUtil::EncodeToFixedLenByteArray(buffer + sizeof(int32_t), decimal_size, v);
+    return encoded_byte_size;
+  } else if (parquet_type == parquet::Type::INT32 ||
+             parquet_type == parquet::Type::INT64) {
+    return ParquetPlainEncoder::Encode(v.value(), encoded_byte_size, buffer);
+  }
+  return -1;
+}
+
+template<>
+int Encode(const Decimal4Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal8Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type){
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+/// Test that the decoder fails when asked to decode a truncated value.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  // Check all possible truncations of the buffer.
+  for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
+    InternalType result;
+    /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+    uint8_t* truncated_buffer = new uint8_t[truncated_size];
+    memcpy(truncated_buffer, buffer, truncated_size);
+    int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
+    EXPECT_EQ(-1, decoded_size);
+    delete[] truncated_buffer;
+  }
+}
+
+template <typename InternalType, typename WidenInternalType,
+    parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  // Check all possible truncations of the buffer.
+  for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
+    WidenInternalType result;
+    /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+    uint8_t* truncated_buffer = new uint8_t[truncated_size];
+    memcpy(truncated_buffer, buffer, truncated_size);
+    int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
+        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size,
+        &result);
+    EXPECT_EQ(-1, decoded_size);
+    delete[] truncated_buffer;
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestType(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  InternalType result;
+  int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer,
+      buffer + expected_byte_size, expected_byte_size, &result);
+  EXPECT_EQ(decoded_size, expected_byte_size);
+  EXPECT_EQ(result, v);
+
+  TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
+}
+
+template <typename InternalType, typename WidenInternalType,
+    parquet::Type::type PARQUET_TYPE>
+void TestTypeWidening(const InternalType& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  WidenInternalType result;
+  int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
+      buffer, buffer + expected_byte_size, expected_byte_size, &result);
+  EXPECT_EQ(decoded_size, expected_byte_size);
+  EXPECT_EQ(v, result);
+
+  TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>(
+      v, expected_byte_size);
+}
+
+TEST(PlainEncoding, Basic) {
+  int8_t i8 = 12;
+  int16_t i16 = 123;
+  int32_t i32 = 1234;
+  int64_t i64 = 12345;
+  float f = 1.23;
+  double d = 1.23456;
+  StringValue sv("Hello");
+  TimestampValue tv;
+
+  TestType<int8_t, parquet::Type::INT32>(i8, sizeof(int32_t));
+  TestType<int16_t, parquet::Type::INT32>(i16, sizeof(int32_t));
+  TestType<int32_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+  TestType<int64_t, parquet::Type::INT64>(i64, sizeof(int64_t));
+  TestType<float, parquet::Type::FLOAT>(f, sizeof(float));
+  TestType<double, parquet::Type::DOUBLE>(d, sizeof(double));
+  TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + sv.len);
+  TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
+
+  // Test type widening.
+  TestTypeWidening<int32_t, int64_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+  TestTypeWidening<int32_t, double, parquet::Type::INT32>(i32, sizeof(int32_t));
+  TestTypeWidening<float, double, parquet::Type::FLOAT>(f, sizeof(float));
+
+  int test_val = 1234;
+  int var_len_decimal_size = sizeof(int32_t)
+      + 2 /*min bytes required for storing test_val*/;
+  // Decimal4Value: General test case
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(test_val),
+      sizeof(Decimal4Value));
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal4Value(test_val * -1), sizeof(Decimal4Value));
+  TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val),
+      sizeof(int32_t));
+  TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val * -1),
+      sizeof(int32_t));
+
+  // Decimal8Value: General test case
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(test_val),
+      sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(test_val * -1), sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val),
+      sizeof(int64_t));
+  TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val * -1),
+      sizeof(int64_t));
+
+  // Decimal16Value: General test case
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( Decimal16Value(test_val),
+      sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(test_val * -1), sizeof(Decimal16Value));
+
+  // Decimal8Value: int32 limits test
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::INT64>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(int64_t));
+  TestType<Decimal8Value, parquet::Type::INT64>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(int64_t));
+
+  // Decimal16Value: int32 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
+
+  // Decimal16Value: int64 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
+
+  // two digit values can be encoded with any byte size.
+  for (int i = 1; i <=16; ++i) {
+    if (i <= 4) {
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(i), i);
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(-i), i);
+      TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(i), sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(-i), sizeof(int32_t));
+    }
+    if (i <= 8) {
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(i), i);
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(-i), i);
+      TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(i), sizeof(int64_t));
+      TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(-i), sizeof(int64_t));
+    }
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(-i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(i), i);
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(-i), i);
+  }
+}
+
+TEST(PlainEncoding, DecimalBigEndian) {
+  // Test Basic can pass if we make the same error in encode and decode.
+  // Verify the bytes are actually big endian.
+  uint8_t buffer[] = {
+    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+  };
+
+  // Manually generate this to avoid potential bugs in BitUtil
+  uint8_t buffer_swapped[] = {
+    15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
+  };
+  uint8_t result_buffer[16];
+
+  Decimal4Value d4;
+  Decimal8Value d8;
+  Decimal16Value d16;
+
+  memcpy(&d4, buffer, sizeof(d4));
+  memcpy(&d8, buffer, sizeof(d8));
+  memcpy(&d16, buffer, sizeof(d16));
+
+  int size = ParquetPlainEncoder::Encode(d4, sizeof(d4), result_buffer);
+  ASSERT_EQ(size, sizeof(d4));
+  ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d4), sizeof(d4)), 0);
+
+  size = ParquetPlainEncoder::Encode(d8, sizeof(d8), result_buffer);
+  ASSERT_EQ(size, sizeof(d8));
+  ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d8), sizeof(d8)), 0);
+
+  size = ParquetPlainEncoder::Encode(d16, sizeof(d16), result_buffer);
+  ASSERT_EQ(size, sizeof(d16));
+  ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d16), sizeof(d16)), 0);
+}
+
+/// Test that corrupt strings are handled correctly.
+TEST(PlainEncoding, CorruptString) {
+  // Test string with negative length.
+  uint8_t buffer[sizeof(int32_t) + 10];
+  int32_t len = -10;
+  memcpy(buffer, &len, sizeof(int32_t));
+
+  StringValue result;
+  int decoded_size = ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+      buffer, buffer + sizeof(buffer), 0, &result);
+  EXPECT_EQ(decoded_size, -1);
+}
+
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-scratch-tuple-batch.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-scratch-tuple-batch.h b/be/src/exec/parquet/parquet-scratch-tuple-batch.h
new file mode 100644
index 0000000..1b79be1
--- /dev/null
+++ b/be/src/exec/parquet/parquet-scratch-tuple-batch.h
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+
+#include "runtime/descriptors.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+
+namespace impala {
+
+/// Helper struct that holds a batch of tuples allocated from a mem pool, as well
+/// as state associated with iterating over its tuples and transferring
+/// them to an output batch in TransferScratchTuples().
+struct ScratchTupleBatch {
+  // Memory for the fixed-length parts of the batch of tuples. Allocated from
+  // 'tuple_mem_pool'. Set to NULL when transferred to an output batch.
+  uint8_t* tuple_mem = nullptr;
+  // Number of tuples that can be stored in 'tuple_mem'.
+  int capacity;
+  // Keeps track of the current tuple index.
+  int tuple_idx = 0;
+  // Number of valid tuples in tuple_mem.
+  int num_tuples = 0;
+  // Number of tuples transferred to output batches (i.e. not filtered by predicates).
+  // num_tuples_transferred > 0 before a call to FinalizeTupleTransfer() implies that
+  // tuples from the current scratch batch were transferred to a previous output batch.
+  int num_tuples_transferred = 0;
+  // Bytes of fixed-length data per tuple.
+  const int tuple_byte_size;
+
+  // Pool used to allocate 'tuple_mem' and nothing else.
+  MemPool tuple_mem_pool;
+
+  // Pool used to accumulate other memory that may be referenced by var-len slots in this
+  // batch, e.g. decompression buffers, allocations for var-len strings and allocations
+  // for nested arrays. This memory may be referenced by previous batches or the current
+  // batch, but not by future batches. E.g. a decompression buffer can be safely attached
+  // only once all values referencing that buffer have been materialized into the batch.
+  MemPool aux_mem_pool;
+
+  // Tuples transferred to an output row batch are compacted if
+  // (# tuples materialized / # tuples returned) exceeds this number. Chosen so that the
+  // cost of copying the tuples should be very small in relation to the original cost of
+  // materialising them.
+  const int MIN_SELECTIVITY_TO_COMPACT = 16;
+
+  ScratchTupleBatch(
+      const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
+    : capacity(batch_size),
+      tuple_byte_size(row_desc.GetRowSize()),
+      tuple_mem_pool(mem_tracker),
+      aux_mem_pool(mem_tracker) {
+    DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
+  }
+
+  Status Reset(RuntimeState* state) {
+    tuple_idx = 0;
+    num_tuples = 0;
+    num_tuples_transferred = 0;
+    if (tuple_mem == nullptr) {
+      int64_t dummy;
+      RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(
+          state, &tuple_mem_pool, tuple_byte_size, &capacity, &dummy, &tuple_mem));
+    }
+    return Status::OK();
+  }
+
+  /// Release all memory in the MemPools. If 'dst_pool' is non-NULL, transfers it to
+  /// 'dst_pool'. Otherwise frees the memory.
+  void ReleaseResources(MemPool* dst_pool) {
+    if (dst_pool == nullptr) {
+      tuple_mem_pool.FreeAll();
+      aux_mem_pool.FreeAll();
+    } else {
+      dst_pool->AcquireData(&tuple_mem_pool, false);
+      dst_pool->AcquireData(&aux_mem_pool, false);
+    }
+    tuple_mem = nullptr;
+  }
+
+  /// Finalize transfer of 'num_to_commit' tuples to 'dst_batch' and transfer memory to
+  /// 'dst_batch' if at the end of 'scratch_batch'. The tuples must not yet be
+  /// committed to 'dst_batch'. Only needs to be called when materialising non-empty
+  /// tuples.
+  void FinalizeTupleTransfer(RowBatch* dst_batch, int num_to_commit) {
+    DCHECK_GE(num_to_commit, 0);
+    DCHECK_LE(dst_batch->num_rows() + num_to_commit, dst_batch->capacity());
+    DCHECK_LE(num_tuples_transferred + num_to_commit, num_tuples);
+    DCHECK(tuple_mem != nullptr);
+    num_tuples_transferred += num_to_commit;
+    if (!AtEnd()) return;
+    // We're at the end of the scratch batch. Transfer memory that may be referenced by
+    // transferred tuples or that we can't reuse to 'dst_batch'.
+
+    // Future tuples won't reference data in 'aux_mem_pool' - always transfer so that
+    // we don't accumulate unneeded memory in the scratch batch.
+    dst_batch->tuple_data_pool()->AcquireData(&aux_mem_pool, false);
+
+    // Try to avoid the transfer of 'tuple_mem' for selective scans by compacting the
+    // output batch. This avoids excessive allocation and transfer of memory, which
+    // can lead to performance problems like IMPALA-4923.
+    // Compaction is unsafe if the scratch batch was split across multiple output batches
+    // because the batch we returned earlier may hold a reference into 'tuple_mem'.
+    if (num_tuples_transferred > num_to_commit
+        || num_tuples_transferred * MIN_SELECTIVITY_TO_COMPACT > num_tuples
+        || !TryCompact(dst_batch, num_to_commit)) {
+      // Didn't compact - rows in 'dst_batch' reference 'tuple_mem'.
+      dst_batch->tuple_data_pool()->AcquireData(&tuple_mem_pool, false);
+      tuple_mem = nullptr;
+    }
+  }
+
+  /// Try to compact 'num_uncommitted_tuples' uncommitted tuples that were added to
+  /// the end of 'dst_batch' by copying them to memory allocated from
+  /// dst_batch->tuple_data_pool(). Returns true on success or false if the memory
+  /// could not be allocated.
+  bool TryCompact(RowBatch* dst_batch, int num_uncommitted_tuples) {
+    DCHECK_LE(dst_batch->num_rows() + num_uncommitted_tuples, dst_batch->capacity());
+    // Copy rows that reference 'tuple_mem' into a new small buffer. This code handles
+    // the case where num_uncommitted_tuples == 0, since TryAllocate() returns a non-null
+    // pointer.
+    int64_t dst_bytes = num_uncommitted_tuples * static_cast<int64_t>(tuple_byte_size);
+    uint8_t* dst_buffer = dst_batch->tuple_data_pool()->TryAllocate(dst_bytes);
+    if (dst_buffer == nullptr) return false;
+    const int end_row = dst_batch->num_rows() + num_uncommitted_tuples;
+    for (int i = dst_batch->num_rows(); i < end_row; ++i) {
+      TupleRow* row = dst_batch->GetRow(i);
+      Tuple* uncompacted_tuple = row->GetTuple(0);
+      DCHECK_GE(reinterpret_cast<uint8_t*>(uncompacted_tuple), tuple_mem);
+      DCHECK_LT(reinterpret_cast<uint8_t*>(uncompacted_tuple),
+          tuple_mem + tuple_byte_size * capacity);
+      row->SetTuple(0, reinterpret_cast<Tuple*>(dst_buffer));
+      memcpy(dst_buffer, uncompacted_tuple, tuple_byte_size);
+      dst_buffer += tuple_byte_size;
+    }
+    return true;
+  }
+
+  Tuple* GetTuple(int tuple_idx) const {
+    return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
+  }
+
+  uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
+  uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
+  bool AtEnd() const { return tuple_idx == num_tuples; }
+  int64_t total_allocated_bytes() const {
+    return tuple_mem_pool.total_allocated_bytes() + aux_mem_pool.total_allocated_bytes();
+  }
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-version-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-version-test.cc b/be/src/exec/parquet/parquet-version-test.cc
new file mode 100644
index 0000000..5eaa692
--- /dev/null
+++ b/be/src/exec/parquet/parquet-version-test.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+
+#include "exec/parquet/parquet-metadata-utils.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+void CheckVersionParse(const string& s, const string& expected_application,
+    int expected_major, int expected_minor, int expected_patch,
+    bool expected_is_internal) {
+  ParquetFileVersion v(s);
+  EXPECT_EQ(v.application, expected_application) << "String: " << s;
+  EXPECT_EQ(v.version.major, expected_major) << "String: " << s;
+  EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s;
+  EXPECT_EQ(v.version.patch, expected_patch) << "String: " << s;
+  EXPECT_EQ(v.is_impala_internal, expected_is_internal);
+}
+
+TEST(ParquetVersionTest, Parsing) {
+  CheckVersionParse("impala version 1.0", "impala", 1, 0, 0, false);
+  CheckVersionParse("impala VERSION 1.0", "impala", 1, 0, 0, false);
+  CheckVersionParse("impala VERSION 1.0 ignored", "impala", 1, 0, 0, false);
+  CheckVersionParse("parquet-mr version 2.0", "parquet-mr", 2, 0, 0, false);
+
+  CheckVersionParse("impala version 1.2", "impala", 1, 2, 0, false);
+  CheckVersionParse("impala version 1.2.3", "impala", 1, 2, 3, false);
+  CheckVersionParse("impala version 1.2.3-cdh4.5", "impala", 1, 2, 3, false);
+  CheckVersionParse("impala version 1.2.3.cdh4.5", "impala", 1, 2, 3, false);
+  CheckVersionParse("impala version 1.2-cdh4.5", "impala", 1, 2, 0, false);
+  CheckVersionParse("impala version 1.2.cdh4.5", "impala", 1, 2, 0, false);
+  CheckVersionParse("impala version 1.2 (build xyz)", "impala", 1, 2, 0, false);
+  CheckVersionParse("impala version cdh4.5", "impala", 0, 0, 0, false);
+
+  // Test internal versions
+  CheckVersionParse("impala version 1.0-internal", "impala", 1, 0, 0, true);
+  CheckVersionParse("impala version 1.23-internal", "impala", 1, 23, 0, true);
+  CheckVersionParse("impala version 2-inTERnal", "impala", 2, 0, 0, true);
+  CheckVersionParse("mr version 1-internal", "mr", 1, 0, 0, false);
+
+  // Test some malformed strings.
+  CheckVersionParse("parquet-mr 2.0", "parquet-mr", 0, 0, 0, false);
+  CheckVersionParse("impala ve 2.0", "impala", 0, 0, 0, false);
+  CheckVersionParse("", "", 0, 0, 0, false);
+}
+
+TEST(ParquetVersionTest, Comparisons) {
+  ParquetFileVersion v("foo version 1.2.3");
+  EXPECT_TRUE(v.VersionEq(1, 2, 3));
+  EXPECT_FALSE(v.VersionEq(1, 2, 4));
+  EXPECT_TRUE(v.VersionLt(3, 2, 1));
+  EXPECT_TRUE(v.VersionLt(1, 2, 4));
+  EXPECT_TRUE(v.VersionLt(2, 0, 0));
+  EXPECT_FALSE(v.VersionLt(0, 0, 0));
+  EXPECT_FALSE(v.VersionLt(1, 2, 3));
+  EXPECT_FALSE(v.VersionLt(1, 2, 2));
+  EXPECT_FALSE(v.VersionLt(0, 4, 4));
+}
+
+}
+
+IMPALA_TEST_MAIN();
+

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index dd70b36..74347cd 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -31,6 +31,7 @@ add_library(Util
   backend-gflag-util.cc
   benchmark.cc
   bitmap.cc
+  bit-packing.cc
   bit-util.cc
   bloom-filter.cc
   bloom-filter-ir.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing-test.cc b/be/src/util/bit-packing-test.cc
index bedf178..f1ec10a 100644
--- a/be/src/util/bit-packing-test.cc
+++ b/be/src/util/bit-packing-test.cc
@@ -21,7 +21,7 @@
 
 #include "testutil/gtest-util.h"
 #include "testutil/mem-util.h"
-#include "util/bit-packing.inline.h"
+#include "util/bit-packing.h"
 #include "util/bit-stream-utils.inline.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.cc b/be/src/util/bit-packing.cc
new file mode 100644
index 0000000..cb3233d
--- /dev/null
+++ b/be/src/util/bit-packing.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bit-packing.inline.h"
+
+#include "runtime/decimal-value.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
+
+namespace impala {
+
+// Instantiate all of the templated functions needed by the rest of Impala.
+#define INSTANTIATE_UNPACK_VALUES(OUT_TYPE)                                       \
+  template std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues<OUT_TYPE>( \
+      int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes,            \
+      int64_t num_values, OUT_TYPE* __restrict__ out);
+
+INSTANTIATE_UNPACK_VALUES(bool);
+INSTANTIATE_UNPACK_VALUES(uint8_t);
+INSTANTIATE_UNPACK_VALUES(uint32_t);
+
+#define INSTANTIATE_UNPACK_AND_DECODE(OUT_TYPE)                                         \
+  template std::pair<const uint8_t*, int64_t>                                           \
+  BitPacking::UnpackAndDecodeValues<OUT_TYPE>(int bit_width,                            \
+      const uint8_t* __restrict__ in, int64_t in_bytes, OUT_TYPE* __restrict__ dict,    \
+      int64_t dict_len, int64_t num_values, OUT_TYPE* __restrict__ out, int64_t stride, \
+      bool* __restrict__ decode_error);
+
+INSTANTIATE_UNPACK_AND_DECODE(bool);
+INSTANTIATE_UNPACK_AND_DECODE(double);
+INSTANTIATE_UNPACK_AND_DECODE(float);
+INSTANTIATE_UNPACK_AND_DECODE(int8_t);
+INSTANTIATE_UNPACK_AND_DECODE(int16_t);
+INSTANTIATE_UNPACK_AND_DECODE(int32_t);
+INSTANTIATE_UNPACK_AND_DECODE(int64_t);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal4Value);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal8Value);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal16Value);
+INSTANTIATE_UNPACK_AND_DECODE(StringValue);
+INSTANTIATE_UNPACK_AND_DECODE(TimestampValue);
+
+// Required for bit-packing-benchmark.cc.
+template
+const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
+    int64_t in_bytes, uint32_t* __restrict__ out);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 38b39e2..c70b55e 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_UTIL_BIT_PACKING_H
-#define IMPALA_UTIL_BIT_PACKING_H
-
-namespace impala {
+#pragma once
 
+#include <cstddef>
 #include <cstdint>
-
 #include <utility>
 
+namespace impala {
+
 /// Utilities for manipulating bit-packed values. Bit-packing is a technique for
 /// compressing integer values that do not use the full range of the integer type.
 /// E.g. an array of uint32_t values with range [0, 31] only uses the lower 5 bits
@@ -131,5 +130,3 @@ class BitPacking {
   static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values);
 };
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 6fa31cc..8cebe40 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_UTIL_BIT_PACKING_INLINE_H
-#define IMPALA_UTIL_BIT_PACKING_INLINE_H
+// This contains all the template implementations for functions defined in bit-packing.h.
+// This should be included by files that want to instantiate those templates directly.
+// Including this file is not generally necessary - instead the templates should be
+// instantiated in bit-packing.cc so that compile times stay manageable.
+
+#pragma once
 
 #include "util/bit-packing.h"
 
@@ -345,6 +349,4 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict
   return in + BYTES_TO_READ;
 #pragma pop_macro("DECODE_VALUES_CASE")
 }
-}
-
-#endif
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index aa53c52..48f52da 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -21,7 +21,7 @@
 #include "util/bit-stream-utils.h"
 
 #include "common/compiler-util.h"
-#include "util/bit-packing.inline.h"
+#include "util/bit-packing.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index bf40301..cc7ef82 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -23,7 +23,7 @@
 #include <boost/unordered_map.hpp>
 
 #include "common/compiler-util.h"
-#include "exec/parquet-common.h"
+#include "exec/parquet/parquet-common.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index ecf24f5..c30c30b 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -25,6 +25,7 @@
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "testutil/gtest-util.h"
+#include "util/bit-packing.inline.h"
 #include "util/dict-encoding.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index a50838c..8bb7665 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -35,7 +35,7 @@
 #include <thrift/transport/TBufferTransports.h>
 #pragma clang diagnostic pop
 
-#include "exec/parquet-common.h"
+#include "exec/parquet/parquet-common.h"
 #include "runtime/mem-pool.h"
 #include "util/codec.h"
 #include "util/rle-encoding.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index c52659b..4406e46 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -23,8 +23,9 @@
 #include <math.h>
 
 #include "testutil/gtest-util.h"
-#include "util/rle-encoding.h"
+#include "util/bit-packing.inline.h"
 #include "util/bit-stream-utils.inline.h"
+#include "util/rle-encoding.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 768a569..d8d40b9 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -370,6 +370,9 @@ error_codes = (
   ("PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY", 121,
    "Parquet file '$0' column '$1' contains a timestamp with invalid time of day. "
    "The time of day should be 0 <= and < 24 hour (in nanoseconds)."),
+
+  ("PARQUET_CORRUPT_BOOL_VALUE", 122, "File '$0' is corrupt: error decoding BOOLEAN "
+   "value with encoding $1 at offset $2"),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
index 47fbc1a..3feb7c2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
@@ -2,5 +2,5 @@
 ---- QUERY
 select * from num_values_def_levels_mismatch
 ---- CATCH
-could not read all def levels for column '_c0'
+Could not read definition level, even though metadata states there are 1 values remaining in data page.
 ====