You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/27 02:01:52 UTC
[02/14] impala git commit: IMPALA-7869: break up
parquet-column-readers.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-level-decoder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-level-decoder.h b/be/src/exec/parquet/parquet-level-decoder.h
new file mode 100644
index 0000000..2e0c24e
--- /dev/null
+++ b/be/src/exec/parquet/parquet-level-decoder.h
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+#include "gen-cpp/parquet_types.h"
+#include "util/rle-encoding.h"
+
+namespace impala {
+
+class MemPool;
+
+/// Constants used instead of actual levels to indicate special conditions.
+class ParquetLevel {
+ public:
+ /// The rep and def levels are set to this value to indicate the end of a row group.
+ static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
+ /// Indicates an invalid definition or repetition level.
+ static const int16_t INVALID_LEVEL = -1;
+ /// Indicates an invalid position value.
+ static const int16_t INVALID_POS = -1;
+};
+
+/// Decoder for encoded Parquet levels. Only supports the RLE encoding, not the deprecated
+/// BIT_PACKED encoding. Optionally reads, decodes, and caches level values in batches.
+/// Level values are unsigned 8-bit integers because we support a maximum nesting
+/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
+/// populating the level cache (e.g., with RLE we can memset() repeated values).
+class ParquetLevelDecoder {
+ public:
+ ParquetLevelDecoder(bool is_def_level_decoder)
+ : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR :
+ TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
+
+ /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
+ /// encoding requires reading metadata from the page header. 'cache_size' will be
+ /// rounded up to a multiple of 32 internally.
+ Status Init(const string& filename, parquet::Encoding::type encoding,
+ MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size);
+
+ /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
+ /// as batched methods.
+ inline int16_t ReadLevel();
+
+ /// If the next value is part of a repeated run and is not cached, return the length
+ /// of the repeated run. A max level of 0 is treated as an arbitrarily long run of
+ /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0.
+ inline int32_t NextRepeatedRunLength();
+
+ /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume
+ /// 'num_to_consume' items in the run. Not valid to call if there are cached levels
+ /// that have not been consumed.
+ inline uint8_t GetRepeatedValue(uint32_t num_to_consume);
+
+ /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
+ /// values left to decode in the page. Resets members associated with the cache.
+ /// Returns a non-ok status if there was a problem decoding a level, if a level was
+ /// encountered with a value greater than max_level_, or if fewer than
+ /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the
+ /// input did not have the expected number of values. Only valid to call when
+ /// the cache has been exhausted, i.e. CacheHasNext() is false.
+ Status CacheNextBatch(int vals_remaining);
+
+ /// Functions for working with the level cache.
+ bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
+ uint8_t CacheGetNext() {
+ DCHECK_LT(cached_level_idx_, num_cached_levels_);
+ return cached_levels_[cached_level_idx_++];
+ }
+ void CacheSkipLevels(int num_levels) {
+ DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
+ cached_level_idx_ += num_levels;
+ }
+ int CacheSize() const { return num_cached_levels_; }
+ int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
+ int CacheCurrIdx() const { return cached_level_idx_; }
+
+ private:
+ /// Initializes members associated with the level cache. Allocates memory for
+ /// the cache from pool, if necessary.
+ Status InitCache(MemPool* pool, int cache_size);
+
+ /// Decodes and writes a batch of levels into the cache. Returns true and sets
+ /// the number of values written to the cache via *num_cached_levels if no errors
+ /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the
+ /// end of input was hit without any other errors. Returns false if there was an
+ /// error decoding a level or if there was an invalid level value greater than
+ /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
+ /// CacheHasNext() is false.
+ bool FillCache(int batch_size, int* num_cached_levels);
+
+ /// RLE decoder, used if max_level_ > 0.
+ RleBatchDecoder<uint8_t> rle_decoder_;
+
+ /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
+ /// in Init().
+ uint8_t* cached_levels_ = nullptr;
+
+ /// Number of valid level values in the cache.
+ int num_cached_levels_ = 0;
+
+ /// Current index into cached_levels_.
+ int cached_level_idx_ = 0;
+
+ /// For error checking and reporting.
+ int max_level_ = 0;
+
+ /// Number of level values cached_levels_ has memory allocated for. Always
+ /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
+ int cache_size_ = 0;
+
+ /// Name of the parquet file. Used for reporting level decoding errors.
+ string filename_;
+
+ /// Error code to use when reporting level decoding errors.
+ TErrorCode::type decoding_error_code_;
+};
+
+inline int16_t ParquetLevelDecoder::ReadLevel() {
+ if (UNLIKELY(!CacheHasNext())) {
+ if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
+ return ParquetLevel::INVALID_LEVEL;
+ }
+ DCHECK_GE(num_cached_levels_, 0);
+ if (UNLIKELY(num_cached_levels_ == 0)) {
+ return ParquetLevel::INVALID_LEVEL;
+ }
+ }
+ return CacheGetNext();
+}
+
+inline int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
+ if (CacheHasNext()) return 0;
+ // Treat always-zero levels as an infinitely long run of zeroes. Return the maximum
+ // run length allowed by the Parquet standard.
+ if (max_level_ == 0) return numeric_limits<int32_t>::max();
+ return rle_decoder_.NextNumRepeats();
+}
+
+inline uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
+ DCHECK(!CacheHasNext());
+ // Treat always-zero levels as an infinitely long run of zeroes.
+ if (max_level_ == 0) return 0;
+ return rle_decoder_.GetRepeatedValue(num_to_consume);
+}
+
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
new file mode 100644
index 0000000..7cbfeda
--- /dev/null
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -0,0 +1,733 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-metadata-utils.h"
+
+#include <strings.h>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "exec/parquet/parquet-column-stats.h"
+#include "exec/parquet/parquet-common.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+
+#include "common/names.h"
+
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+
+namespace impala {
+
+namespace {
+
+const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = {
+ {PrimitiveType::INVALID_TYPE, {parquet::Type::BOOLEAN}},
+ {PrimitiveType::TYPE_NULL, {parquet::Type::BOOLEAN}},
+ {PrimitiveType::TYPE_BOOLEAN, {parquet::Type::BOOLEAN}},
+ {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
+ {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
+ {PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
+ {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT32, parquet::Type::INT64}},
+ {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
+ {PrimitiveType::TYPE_DOUBLE, {parquet::Type::INT32, parquet::Type::FLOAT,
+ parquet::Type::DOUBLE}},
+ {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
+ {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_DATETIME, {parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_BINARY, {parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_DECIMAL, {parquet::Type::INT32, parquet::Type::INT64,
+ parquet::Type::FIXED_LEN_BYTE_ARRAY, parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_CHAR, {parquet::Type::BYTE_ARRAY}},
+ {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}},
+};
+
+/// Physical types that are only supported with specific converted types.
+const map<PrimitiveType, set<pair<parquet::Type::type, parquet::ConvertedType::type>>>
+ SUPPORTED_CONVERTED_TYPES = {
+ {PrimitiveType::TYPE_TIMESTAMP,
+ {{parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MICROS},
+ {parquet::Type::INT64, parquet::ConvertedType::TIMESTAMP_MILLIS}}}};
+};
+
+/// Returns true if 'parquet_type' is a supported physical encoding for the Impala
+/// primitive type, false otherwise. Some physical types are accepted only for certain
+/// converted types.
+bool IsSupportedType(PrimitiveType impala_type,
+ const parquet::SchemaElement& element) {
+ auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type);
+ DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end());
+ parquet::Type::type parquet_type = element.type;
+ if (encodings->second.find(parquet_type) != encodings->second.end()) return true;
+
+ if(!element.__isset.converted_type) return false;
+ parquet::ConvertedType::type converted_type = element.converted_type;
+ auto converted_types = SUPPORTED_CONVERTED_TYPES.find(impala_type);
+ if (converted_types == SUPPORTED_CONVERTED_TYPES.end()) return false;
+ if (converted_types->second.find({parquet_type, converted_type})
+ != converted_types->second.end()) return true;
+
+ return false;
+}
+
+// Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
+const std::vector<ParquetSchemaResolver::ArrayEncoding>
+ ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
+ {{ParquetSchemaResolver::THREE_LEVEL, ParquetSchemaResolver::ONE_LEVEL},
+ {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::ONE_LEVEL},
+ {ParquetSchemaResolver::TWO_LEVEL, ParquetSchemaResolver::THREE_LEVEL,
+ ParquetSchemaResolver::ONE_LEVEL}};
+
+Status ParquetMetadataUtils::ValidateFileVersion(
+ const parquet::FileMetaData& file_metadata, const char* filename) {
+ if (file_metadata.version > PARQUET_CURRENT_VERSION) {
+ stringstream ss;
+ ss << "File: " << filename << " is of an unsupported version. "
+ << "file version: " << file_metadata.version;
+ return Status(ss.str());
+ }
+ return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
+ int64_t file_length, const parquet::RowGroup& row_group) {
+ for (int i = 0; i < row_group.columns.size(); ++i) {
+ const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+ RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+ col_chunk.meta_data.data_page_offset, "data page offset"));
+ int64_t col_start = col_chunk.meta_data.data_page_offset;
+ // The file format requires that if a dictionary page exists, it be before data pages.
+ if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+ RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+ col_chunk.meta_data.dictionary_page_offset, "dictionary page offset"));
+ if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
+ return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary "
+ "page (offset=$1) must come before any data pages (offset=$2).",
+ filename, col_chunk.meta_data.dictionary_page_offset, col_start));
+ }
+ col_start = col_chunk.meta_data.dictionary_page_offset;
+ }
+ int64_t col_len = col_chunk.meta_data.total_compressed_size;
+ int64_t col_end = col_start + col_len;
+ if (col_end <= 0 || col_end > file_length) {
+ return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has "
+ "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i,
+ col_start, col_len, file_length));
+ }
+ }
+ return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx,
+ int64_t file_length, int64_t offset, const string& offset_name) {
+ if (offset < 0 || offset >= file_length) {
+ return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid "
+ "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset,
+ file_length));
+ }
+ return Status::OK();;
+}
+
+static bool IsEncodingSupported(parquet::Encoding::type e) {
+ switch (e) {
+ case parquet::Encoding::PLAIN:
+ case parquet::Encoding::PLAIN_DICTIONARY:
+ case parquet::Encoding::BIT_PACKED:
+ case parquet::Encoding::RLE:
+ return true;
+ default:
+ return false;
+ }
+}
+
+Status ParquetMetadataUtils::ValidateRowGroupColumn(
+ const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx,
+ int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) {
+ const parquet::ColumnMetaData& col_chunk_metadata =
+ file_metadata.row_groups[row_group_idx].columns[col_idx].meta_data;
+
+ // Check the encodings are supported.
+ const vector<parquet::Encoding::type>& encodings = col_chunk_metadata.encodings;
+ for (int i = 0; i < encodings.size(); ++i) {
+ if (!IsEncodingSupported(encodings[i])) {
+ return Status(Substitute("File '$0' uses an unsupported encoding: $1 for column "
+ "'$2'.", filename, PrintThriftEnum(encodings[i]), schema_element.name));
+ }
+ }
+
+ // Check the compression is supported.
+ if (col_chunk_metadata.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+ col_chunk_metadata.codec != parquet::CompressionCodec::SNAPPY &&
+ col_chunk_metadata.codec != parquet::CompressionCodec::GZIP) {
+ return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
+ "'$2'.", filename, col_chunk_metadata.codec, schema_element.name));
+ }
+
+ if (col_chunk_metadata.type != schema_element.type) {
+ return Status(Substitute("Mismatched column chunk Parquet type in file '$0' column "
+ "'$1'. Expected $2 actual $3: file may be corrupt", filename,
+ schema_element.name, col_chunk_metadata.type, schema_element.type));
+ }
+ return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const char* filename,
+ const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+ RuntimeState* state) {
+ // Following validation logic is only for non-complex types.
+ if (slot_desc->type().IsComplexType()) return Status::OK();
+
+ if (UNLIKELY(!IsSupportedType(slot_desc->type().type, schema_element))) {
+ return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical "
+ "type: $1, physical type: $2. File may be corrupt.",
+ filename, slot_desc->type().type, schema_element.type));
+ }
+
+ // Check the decimal scale in the file matches the metastore scale and precision.
+ // We fail the query if the metadata makes it impossible for us to safely read
+ // the file. If we don't require the metadata, we will fail the query if
+ // abort_on_error is true, otherwise we will just log a warning.
+ bool is_converted_type_decimal = schema_element.__isset.converted_type
+ && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+ if (slot_desc->type().type == TYPE_DECIMAL) {
+ // TODO: allow converting to wider type (IMPALA-2515)
+ if (schema_element.type == parquet::Type::INT32 &&
+ sizeof(int32_t) != slot_desc->type().GetByteSize()) {
+ return Status(Substitute("File '$0' decimal column '$1' is stored as INT32, but "
+ "based on the precision in the table metadata, another type would needed.",
+ filename, schema_element.name));
+ }
+ if (schema_element.type == parquet::Type::INT64 &&
+ sizeof(int64_t) != slot_desc->type().GetByteSize()) {
+ return Status(Substitute("File '$0' decimal column '$1' is stored as INT64, but "
+ "based on the precision in the table metadata, another type would needed.",
+ filename, schema_element.name));
+ }
+ // We require that the scale and byte length be set.
+ if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ if (!schema_element.__isset.type_length) {
+ return Status(Substitute("File '$0' column '$1' does not have type_length set.",
+ filename, schema_element.name));
+ }
+
+ int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+ if (schema_element.type_length != expected_len) {
+ return Status(Substitute("File '$0' column '$1' has an invalid type length. "
+ "Expecting: $2 len in file: $3", filename, schema_element.name, expected_len,
+ schema_element.type_length));
+ }
+ }
+ if (!schema_element.__isset.scale) {
+ return Status(Substitute("File '$0' column '$1' does not have the scale set.",
+ filename, schema_element.name));
+ }
+
+ if (schema_element.scale != slot_desc->type().scale) {
+ // TODO: we could allow a mismatch and do a conversion at this step.
+ return Status(Substitute("File '$0' column '$1' has a scale that does not match "
+ "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
+ filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
+ }
+
+ // The other decimal metadata should be there but we don't need it.
+ if (!schema_element.__isset.precision) {
+ ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
+ RETURN_IF_ERROR(state->LogOrReturnError(msg));
+ } else {
+ if (schema_element.precision != slot_desc->type().precision) {
+ // TODO: we could allow a mismatch and do a conversion at this step.
+ ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
+ schema_element.precision, slot_desc->type().precision);
+ RETURN_IF_ERROR(state->LogOrReturnError(msg));
+ }
+ }
+
+ if (!is_converted_type_decimal) {
+ // TODO: is this validation useful? It is not required at all to read the data and
+ // might only serve to reject otherwise perfectly readable files.
+ ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename,
+ schema_element.name);
+ RETURN_IF_ERROR(state->LogOrReturnError(msg));
+ }
+ } else if (schema_element.__isset.scale || schema_element.__isset.precision
+ || is_converted_type_decimal) {
+ ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, schema_element.name,
+ slot_desc->type().DebugString());
+ RETURN_IF_ERROR(state->LogOrReturnError(msg));
+ }
+ return Status::OK();
+}
+
+ParquetFileVersion::ParquetFileVersion(const string& created_by) {
+ string created_by_lower = created_by;
+ std::transform(created_by_lower.begin(), created_by_lower.end(),
+ created_by_lower.begin(), ::tolower);
+ is_impala_internal = false;
+
+ vector<string> tokens;
+ split(tokens, created_by_lower, is_any_of(" "), token_compress_on);
+ // Boost always creates at least one token
+ DCHECK_GT(tokens.size(), 0);
+ application = tokens[0];
+
+ if (tokens.size() >= 3 && tokens[1] == "version") {
+ string version_string = tokens[2];
+ // Ignore any trailing nodextra characters
+ int n = version_string.find_first_not_of("0123456789.");
+ string version_string_trimmed = version_string.substr(0, n);
+
+ vector<string> version_tokens;
+ split(version_tokens, version_string_trimmed, is_any_of("."));
+ version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
+ version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
+ version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
+
+ if (application == "impala") {
+ if (version_string.find("-internal") != string::npos) is_impala_internal = true;
+ }
+ } else {
+ version.major = 0;
+ version.minor = 0;
+ version.patch = 0;
+ }
+}
+
+bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const {
+ if (version.major < major) return true;
+ if (version.major > major) return false;
+ DCHECK_EQ(version.major, major);
+ if (version.minor < minor) return true;
+ if (version.minor > minor) return false;
+ DCHECK_EQ(version.minor, minor);
+ return version.patch < patch;
+}
+
+bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const {
+ return version.major == major && version.minor == minor && version.patch == patch;
+}
+
+static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) {
+ switch (t) {
+ case parquet::FieldRepetitionType::REQUIRED: return "required";
+ case parquet::FieldRepetitionType::OPTIONAL: return "optional";
+ case parquet::FieldRepetitionType::REPEATED: return "repeated";
+ default: return "<unknown>";
+ }
+}
+
+static string PrintParquetType(const parquet::Type::type& t) {
+ switch (t) {
+ case parquet::Type::BOOLEAN: return "boolean";
+ case parquet::Type::INT32: return "int32";
+ case parquet::Type::INT64: return "int64";
+ case parquet::Type::INT96: return "int96";
+ case parquet::Type::FLOAT: return "float";
+ case parquet::Type::DOUBLE: return "double";
+ case parquet::Type::BYTE_ARRAY: return "byte_array";
+ case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array";
+ default: return "<unknown>";
+ }
+}
+
+string SchemaNode::DebugString(int indent) const {
+ stringstream ss;
+ for (int i = 0; i < indent; ++i) ss << " ";
+ ss << PrintRepetitionType(element->repetition_type) << " ";
+ if (element->num_children > 0) {
+ ss << "struct";
+ } else {
+ ss << PrintParquetType(element->type);
+ }
+ ss << " " << element->name << " [i:" << col_idx << " d:" << max_def_level
+ << " r:" << max_rep_level << "]";
+ if (element->num_children > 0) {
+ ss << " {" << endl;
+ for (int i = 0; i < element->num_children; ++i) {
+ ss << children[i].DebugString(indent + 2) << endl;
+ }
+ for (int i = 0; i < indent; ++i) ss << " ";
+ ss << "}";
+ }
+ return ss.str();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+ const vector<parquet::SchemaElement>& schema, SchemaNode* node) const {
+ int idx = 0;
+ int col_idx = 0;
+ RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node));
+ if (node->children.empty()) {
+ return Status(Substitute("Invalid file: '$0' has no columns.", filename_));
+ }
+ return Status::OK();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+ const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level,
+ int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
+ const {
+ if (*idx >= schema.size()) {
+ return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from "
+ "flattened schema in file metadata", filename_));
+ }
+ bool is_root_schema = (*idx == 0);
+ node->element = &schema[*idx];
+ ++(*idx);
+
+ if (node->element->num_children == 0) {
+ // node is a leaf node, meaning it's materialized in the file and appears in
+ // file_metadata_.row_groups.columns
+ node->col_idx = *col_idx;
+ ++(*col_idx);
+ } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
+ // Sanity-check the schema to avoid allocating absurdly large buffers below.
+ return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than "
+ "limit of $2. File is likely corrupt", filename_, node->element->num_children,
+ SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
+ } else if (node->element->num_children < 0) {
+ return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
+ filename_, node->element->num_children));
+ }
+
+ // def_level_of_immediate_repeated_ancestor does not include this node, so set before
+ // updating ira_def_level
+ node->def_level_of_immediate_repeated_ancestor = ira_def_level;
+
+ if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
+ ++max_def_level;
+ } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED &&
+ !is_root_schema /*PARQUET-843*/) {
+ ++max_rep_level;
+ // Repeated fields add a definition level. This is used to distinguish between an
+ // empty list and a list with an item in it.
+ ++max_def_level;
+ // node is the new most immediate repeated ancestor
+ ira_def_level = max_def_level;
+ }
+ node->max_def_level = max_def_level;
+ node->max_rep_level = max_rep_level;
+
+ node->children.resize(node->element->num_children);
+ for (int i = 0; i < node->element->num_children; ++i) {
+ RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level,
+ idx, col_idx, &node->children[i]));
+ }
+ return Status::OK();
+}
+
+Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node,
+ bool* pos_field, bool* missing_field) const {
+ *missing_field = false;
+ const vector<ArrayEncoding>& ordered_array_encodings =
+ ORDERED_ARRAY_ENCODINGS[array_resolution_];
+
+ bool any_missing_field = false;
+ Status statuses[NUM_ARRAY_ENCODINGS];
+ for (const auto& array_encoding: ordered_array_encodings) {
+ bool current_missing_field;
+ statuses[array_encoding] = ResolvePathHelper(
+ array_encoding, path, node, pos_field, ¤t_missing_field);
+ if (current_missing_field) DCHECK(statuses[array_encoding].ok());
+ if (statuses[array_encoding].ok() && !current_missing_field) return Status::OK();
+ any_missing_field = any_missing_field || current_missing_field;
+ }
+ // None of resolutions yielded a node. Set *missing_field to true if any of the
+ // resolutions reported a missing a field.
+ if (any_missing_field) {
+ *node = NULL;
+ *missing_field = true;
+ return Status::OK();
+ }
+
+ // All resolutions failed. Log and return the most relevant status. The three-level
+ // encoding is the Parquet standard, so always prefer that. Prefer the two-level over
+ // the one-level because the two-level can be specifically selected via a query option.
+ Status error_status = Status::OK();
+ for (int i = THREE_LEVEL; i >= ONE_LEVEL; --i) {
+ if (!statuses[i].ok()) {
+ error_status = statuses[i];
+ break;
+ }
+ }
+ DCHECK(!error_status.ok());
+ *node = NULL;
+ VLOG_QUERY << error_status.msg().msg() << "\n" << GetStackTrace();
+ return error_status;
+}
+
+Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding,
+ const SchemaPath& path, SchemaNode** node, bool* pos_field,
+ bool* missing_field) const {
+ DCHECK(schema_.element != NULL)
+ << "schema_ must be initialized before calling ResolvePath()";
+
+ *pos_field = false;
+ *missing_field = false;
+ *node = const_cast<SchemaNode*>(&schema_);
+ const ColumnType* col_type = NULL;
+
+ // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by
+ // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema.
+ for (int i = 0; i < path.size(); ++i) {
+ // Advance '*node' if necessary
+ if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
+ *node = NextSchemaNode(col_type, path, i, *node, missing_field);
+ if (*missing_field) return Status::OK();
+ } else {
+ // We just resolved an array, meaning *node is set to the repeated field of the
+ // array. Since we are trying to resolve using one- or two-level array encoding, the
+ // repeated field represents both the array and the array's item (i.e. there is no
+ // explict item field), so we don't advance *node in this case.
+ DCHECK(col_type != NULL);
+ DCHECK_EQ(col_type->type, TYPE_ARRAY);
+ DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
+ DCHECK((*node)->is_repeated());
+ }
+
+ // Advance 'col_type'
+ int table_idx = path[i];
+ col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type()
+ : &col_type->children[table_idx];
+
+ // Resolve path[i]
+ if (col_type->type == TYPE_ARRAY) {
+ DCHECK_EQ(col_type->children.size(), 1);
+ RETURN_IF_ERROR(
+ ResolveArray(array_encoding, path, i, node, pos_field, missing_field));
+ if (*missing_field || *pos_field) return Status::OK();
+ } else if (col_type->type == TYPE_MAP) {
+ DCHECK_EQ(col_type->children.size(), 2);
+ RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
+ if (*missing_field) return Status::OK();
+ } else if (col_type->type == TYPE_STRUCT) {
+ DCHECK_GT(col_type->children.size(), 0);
+ // Nothing to do for structs
+ } else {
+ DCHECK(!col_type->IsComplexType());
+ DCHECK_EQ(i, path.size() - 1);
+ RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
+ }
+ }
+ DCHECK(*node != NULL);
+ return Status::OK();
+}
+
+SchemaNode* ParquetSchemaResolver::NextSchemaNode(
+ const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
+ bool* missing_field) const {
+ DCHECK_LT(next_idx, path.size());
+ if (next_idx != 0) DCHECK(col_type != NULL);
+
+ int file_idx;
+ int table_idx = path[next_idx];
+ if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) {
+ if (next_idx == 0) {
+ // Resolve top-level table column by name.
+ DCHECK_LT(table_idx, tbl_desc_.col_descs().size());
+ const string& name = tbl_desc_.col_descs()[table_idx].name();
+ file_idx = FindChildWithName(node, name);
+ } else if (col_type->type == TYPE_STRUCT) {
+ // Resolve struct field by name.
+ DCHECK_LT(table_idx, col_type->field_names.size());
+ const string& name = col_type->field_names[table_idx];
+ file_idx = FindChildWithName(node, name);
+ } else if (col_type->type == TYPE_ARRAY) {
+ // Arrays have only one child in the file.
+ DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+ file_idx = table_idx;
+ } else {
+ DCHECK_EQ(col_type->type, TYPE_MAP);
+ // Maps have two values, "key" and "value". These are supposed to be ordered and may
+ // not have the right field names, but try to resolve by name in case they're
+ // switched and otherwise use the order. See
+ // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+ // more details.
+ DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
+ table_idx == SchemaPathConstants::MAP_VALUE);
+ const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
+ file_idx = FindChildWithName(node, name);
+ if (file_idx >= node->children.size()) {
+ // Couldn't resolve by name, fall back to resolution by position.
+ file_idx = table_idx;
+ }
+ }
+ } else {
+ // Resolution by position.
+ DCHECK_EQ(fallback_schema_resolution_,
+ TParquetFallbackSchemaResolution::type::POSITION);
+ if (next_idx == 0) {
+ // For top-level columns, the first index in a path includes the table's partition
+ // keys.
+ file_idx = table_idx - tbl_desc_.num_clustering_cols();
+ } else {
+ file_idx = table_idx;
+ }
+ }
+
+ if (file_idx >= node->children.size()) {
+ string schema_resolution_mode = "unknown";
+ auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find(
+ fallback_schema_resolution_);
+ if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) {
+ schema_resolution_mode = entry->second;
+ }
+ VLOG_FILE << Substitute(
+ "File '$0' does not contain path '$1' (resolving by $2)", filename_,
+ PrintPath(tbl_desc_, path), schema_resolution_mode);
+ *missing_field = true;
+ return NULL;
+ }
+ return &node->children[file_idx];
+}
+
+int ParquetSchemaResolver::FindChildWithName(SchemaNode* node,
+ const string& name) const {
+ int idx;
+ for (idx = 0; idx < node->children.size(); ++idx) {
+ if (strcasecmp(node->children[idx].element->name.c_str(), name.c_str()) == 0) break;
+ }
+ return idx;
+}
+
+// There are three types of array encodings:
+//
+// 1. One-level encoding
+// A bare repeated field. This is interpreted as a required array of required
+// items.
+// Example:
+// repeated <item-type> item;
+//
+// 2. Two-level encoding
+// A group containing a single repeated field. This is interpreted as a
+// <list-repetition> array of required items (<list-repetition> is either
+// optional or required).
+// Example:
+// <list-repetition> group <name> {
+// repeated <item-type> item;
+// }
+//
+// 3. Three-level encoding
+// The "official" encoding according to the parquet spec. A group containing a
+// single repeated group containing the item field. This is interpreted as a
+// <list-repetition> array of <item-repetition> items (<list-repetition> and
+// <item-repetition> are each either optional or required).
+// Example:
+// <list-repetition> group <name> {
+// repeated group list {
+// <item-repetition> <item-type> item;
+// }
+// }
+//
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a
+// group containing more fields, which corresponds to a complex item type. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for
+// more details and examples.
+//
+// This function resolves the array at '*node' assuming one-, two-, or three-level
+// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all
+// three encodings (unless '*pos_field' or '*missing_field' are set to true).
+Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding,
+ const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
+ bool* missing_field) const {
+ if (array_encoding == ONE_LEVEL) {
+ if (!(*node)->is_repeated()) {
+ ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+ PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+ return Status::Expected(msg);
+ }
+ } else {
+ // In the multi-level case, we always expect the outer group to contain a single
+ // repeated field
+ if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
+ ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+ PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+ return Status::Expected(msg);
+ }
+ // Set *node to the repeated field
+ *node = &(*node)->children[0];
+ }
+ DCHECK((*node)->is_repeated());
+
+ if (idx + 1 < path.size()) {
+ if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
+ // The next index in 'path' is the artifical position field.
+ DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!";
+ *pos_field = true;
+ *node = NULL;
+ return Status::OK();
+ } else {
+ // The next value in 'path' should be the item index
+ DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM);
+ }
+ }
+ return Status::OK();
+}
+
+// According to the parquet spec, map columns are represented like:
+// <map-repetition> group <name> (MAP) {
+// repeated group key_value {
+// required <key-type> key;
+// <value-repetition> <value-type> value;
+// }
+// }
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+// more details.
+Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx,
+ SchemaNode** node, bool* missing_field) const {
+ if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() ||
+ (*node)->children[0].children.size() != 2) {
+ ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+ PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString());
+ return Status::Expected(msg);
+ }
+ *node = &(*node)->children[0];
+
+ // The next index in 'path' should be the key or the value.
+ if (idx + 1 < path.size()) {
+ DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY ||
+ path[idx + 1] == SchemaPathConstants::MAP_VALUE);
+ }
+ return Status::OK();
+}
+
+Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
+ const ColumnType& col_type, const SchemaPath& path, int idx) const {
+ if (!node.children.empty()) {
+ ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+ PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+ return Status::Expected(msg);
+ }
+ if (!IsSupportedType(col_type.type, *node.element)) {
+ ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+ PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+ return Status::Expected(msg);
+ }
+ return Status::OK();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
new file mode 100644
index 0000000..f3a144d
--- /dev/null
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+
+#include <string>
+
+#include "runtime/descriptors.h"
+#include "gen-cpp/parquet_types.h"
+
+namespace impala {
+
+class RuntimeState;
+
+class ParquetMetadataUtils {
+ public:
+ /// Checks the version of the given file and returns a non-OK status if
+ /// Impala does not support that version.
+ static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata,
+ const char* filename);
+
+ /// Validate column offsets by checking if the dictionary page comes before the data
+ /// pages and checking if the column offsets lie within the file.
+ static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
+ const parquet::RowGroup& row_group);
+
+ /// Check that a file offset is in the file. Return an error status with a detailed
+ /// error message if it is not.
+ static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
+ int64_t file_length, int64_t offset, const std::string& offset_name);
+
+ /// Validates the column metadata inside a row group to make sure this column is
+ /// supported (e.g. encoding, type, etc).
+ static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata,
+ const char* filename, int row_group_idx, int col_idx,
+ const parquet::SchemaElement& schema_element, RuntimeState* state);
+
+ /// Validates the column metadata to make sure the column is supported and its type
+ /// attributes conform to the parquet spec.
+ static Status ValidateColumn(const char* filename,
+ const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+ RuntimeState* state);
+};
+
+struct ParquetFileVersion {
+ /// Application that wrote the file. e.g. "IMPALA"
+ std::string application;
+
+ /// Version of the application that wrote the file, expressed in three parts
+ /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are
+ /// ignored. e.g.:
+ /// "1.2.3" => {1, 2, 3}
+ /// "1.2" => {1, 2, 0}
+ /// "1.2-cdh5" => {1, 2, 0}
+ struct {
+ int major;
+ int minor;
+ int patch;
+ } version;
+
+ /// If true, this file was generated by an Impala internal release
+ bool is_impala_internal;
+
+ ParquetFileVersion() : is_impala_internal(false) { }
+
+ /// Parses the version from the created_by string
+ ParquetFileVersion(const std::string& created_by);
+
+ /// Returns true if version is strictly less than <major>.<minor>.<patch>
+ bool VersionLt(int major, int minor = 0, int patch = 0) const;
+
+ /// Returns true if version is equal to <major>.<minor>.<patch>
+ bool VersionEq(int major, int minor, int patch) const;
+};
+
+/// Internal representation of a Parquet schema (including nested-type columns).
+struct SchemaNode {
+ /// The corresponding schema element defined in the file metadata
+ const parquet::SchemaElement* element;
+
+ /// The index into the RowGroup::columns list if this column is materialized in the
+ /// file (i.e. it's a scalar type). -1 for nested types.
+ int col_idx;
+
+ /// The maximum definition level of this column, i.e., the definition level that
+ /// corresponds to a non-NULL value. Valid values are >= 0.
+ int max_def_level;
+
+ /// The maximum repetition level of this column. Valid values are >= 0.
+ int max_rep_level;
+
+ /// The definition level of the most immediate ancestor of this node with repeated
+ /// field repetition type. 0 if there are no repeated ancestors.
+ int def_level_of_immediate_repeated_ancestor;
+
+ /// Any nested schema nodes. Empty for non-nested types.
+ std::vector<SchemaNode> children;
+
+ SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
+ def_level_of_immediate_repeated_ancestor(-1) { }
+
+ std::string DebugString(int indent = 0) const;
+
+ bool is_repeated() const {
+ return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
+ }
+};
+
+/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a
+/// Parquet file schema. Supports resolution by field index or by field name.
+/// Supports different policies for resolving nested arrays based on the modern
+/// three-level encoding or the legacy encodings (one and two level).
+class ParquetSchemaResolver {
+ public:
+ ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc,
+ TParquetFallbackSchemaResolution::type fallback_schema_resolution,
+ TParquetArrayResolution::type array_resolution)
+ : tbl_desc_(tbl_desc),
+ fallback_schema_resolution_(fallback_schema_resolution),
+ array_resolution_(array_resolution),
+ filename_(NULL) {
+ }
+
+ /// Parses the schema of the given file metadata into an internal schema
+ /// representation used in path resolution. Remembers the filename for error
+ /// reporting. Returns a non-OK status if the Parquet schema could not be parsed.
+ Status Init(const parquet::FileMetaData* file_metadata, const char* filename) {
+ DCHECK(filename != NULL);
+ filename_ = filename;
+ return CreateSchemaTree(file_metadata->schema, &schema_);
+ }
+
+ /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
+ /// does not exist in this file's schema, 'missing_field' is set to true and
+ /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
+ /// resolves to a collection position field, *pos_field is set to true. Otherwise
+ /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
+ /// against the file's schema (e.g., unrecognized collection schema).
+ ///
+ /// Tries to resolve fields within lists according to the 'ordered_array_encodings_'.
+ /// Returns a bad status if resolution fails for all attempted array encodings.
+ Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
+ bool* missing_field) const;
+
+ private:
+ /// The 'array_encoding' parameter determines whether to assume one-, two-, or
+ /// three-level array encoding. The returned status is not logged (i.e. it's an expected
+ /// error).
+ enum ArrayEncoding {
+ ONE_LEVEL,
+ TWO_LEVEL,
+ THREE_LEVEL,
+ NUM_ARRAY_ENCODINGS
+ };
+
+ /// An arbitrary limit on the number of children per schema node we support.
+ /// Used to sanity-check Parquet schemas.
+ static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
+
+ /// Maps from the array-resolution policy to the ordered array encodings that should
+ /// be tried during path resolution. All entries have the ONE_LEVEL encoding at the end
+ /// because there is no ambiguity between the one-level and the other encodings (there
+ /// is no harm in trying it).
+ static const std::vector<ArrayEncoding> ORDERED_ARRAY_ENCODINGS[];
+
+ /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
+ /// SchemaNode representation. Returns the result in 'node' unless an error status is
+ /// returned. Does not set the slot_desc field of any SchemaNode.
+ Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+ SchemaNode* node) const;
+
+ /// Recursive implementation used internally by the above CreateSchemaTree() function.
+ Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+ int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
+ SchemaNode* node) const;
+
+ Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
+ SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+ /// Helper functions for ResolvePathHelper().
+
+ /// Advances 'node' to one of its children based on path[next_idx] and
+ /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
+ /// associated with 'node'. Returns the child node or sets 'missing_field' to true.
+ SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
+ int next_idx, SchemaNode* node, bool* missing_field) const;
+
+ /// Returns the index of 'node's child with 'name', or the number of children if not
+ /// found. The name comparison is case-insensitive because that's how Impala treats
+ /// db/table/column/field names. If there are several matches with different casing,
+ /// then the index of the first match is returned.
+ int FindChildWithName(SchemaNode* node, const string& name) const;
+
+ /// The ResolvePathHelper() logic for arrays.
+ Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
+ SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+ /// The ResolvePathHelper() logic for maps.
+ Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
+ bool* missing_field) const;
+
+ /// The ResolvePathHelper() logic for scalars (just does validation since there's no
+ /// more actual work to be done).
+ Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
+ const SchemaPath& path, int idx) const;
+
+ const HdfsTableDescriptor& tbl_desc_;
+ const TParquetFallbackSchemaResolution::type fallback_schema_resolution_;
+ const TParquetArrayResolution::type array_resolution_;
+ const char* filename_;
+
+ /// Root node of our internal schema representation populated in Init().
+ SchemaNode schema_;
+};
+
+} // impala namespace
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-plain-test.cc b/be/src/exec/parquet/parquet-plain-test.cc
new file mode 100644
index 0000000..6eb880f
--- /dev/null
+++ b/be/src/exec/parquet/parquet-plain-test.cc
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+#include "exec/parquet/parquet-common.h"
+#include "runtime/decimal-value.h"
+#include "runtime/string-value.inline.h"
+#include "runtime/timestamp-value.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+template <typename InternalType>
+int Encode(const InternalType& v, int encoded_byte_size, uint8_t* buffer,
+ parquet::Type::type physical_type){
+ return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+}
+
+// Handle special case of encoding decimal types stored as BYTE_ARRAY, INT32, and INT64,
+// since these are not implemented in Impala.
+// When parquet_type equals BYTE_ARRAY: 'encoded_byte_size' is the sum of the
+// minimum number of bytes required to store the unscaled value and the bytes required to
+// store the size. Value 'v' passed to it should not contain leading zeros as this
+// method does not strictly conform to the parquet spec in removing those.
+// When parquet_type is INT32 or INT64, we simply write the unscaled value to the buffer.
+template <typename DecimalType>
+int EncodeDecimal(const DecimalType& v, int encoded_byte_size, uint8_t* buffer,
+ parquet::Type::type parquet_type) {
+ if (parquet_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+ } else if (parquet_type == parquet::Type::BYTE_ARRAY) {
+ int decimal_size = encoded_byte_size - sizeof(int32_t);
+ memcpy(buffer, &decimal_size, sizeof(int32_t));
+ DecimalUtil::EncodeToFixedLenByteArray(buffer + sizeof(int32_t), decimal_size, v);
+ return encoded_byte_size;
+ } else if (parquet_type == parquet::Type::INT32 ||
+ parquet_type == parquet::Type::INT64) {
+ return ParquetPlainEncoder::Encode(v.value(), encoded_byte_size, buffer);
+ }
+ return -1;
+}
+
+template<>
+int Encode(const Decimal4Value& v, int encoded_byte_size, uint8_t* buffer,
+ parquet::Type::type parquet_type) {
+ return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal8Value& v, int encoded_byte_size, uint8_t* buffer,
+ parquet::Type::type parquet_type) {
+ return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer,
+ parquet::Type::type parquet_type){
+ return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+/// Test that the decoder fails when asked to decode a truncated value.
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ // Check all possible truncations of the buffer.
+ for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
+ InternalType result;
+ /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+ uint8_t* truncated_buffer = new uint8_t[truncated_size];
+ memcpy(truncated_buffer, buffer, truncated_size);
+ int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+ truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
+ EXPECT_EQ(-1, decoded_size);
+ delete[] truncated_buffer;
+ }
+}
+
+template <typename InternalType, typename WidenInternalType,
+ parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ // Check all possible truncations of the buffer.
+ for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
+ WidenInternalType result;
+ /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+ uint8_t* truncated_buffer = new uint8_t[truncated_size];
+ memcpy(truncated_buffer, buffer, truncated_size);
+ int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
+ truncated_buffer, truncated_buffer + truncated_size, expected_byte_size,
+ &result);
+ EXPECT_EQ(-1, decoded_size);
+ delete[] truncated_buffer;
+ }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestType(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ InternalType result;
+ int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer,
+ buffer + expected_byte_size, expected_byte_size, &result);
+ EXPECT_EQ(decoded_size, expected_byte_size);
+ EXPECT_EQ(result, v);
+
+ TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
+}
+
+template <typename InternalType, typename WidenInternalType,
+ parquet::Type::type PARQUET_TYPE>
+void TestTypeWidening(const InternalType& v, int expected_byte_size) {
+ uint8_t buffer[expected_byte_size];
+ int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
+ EXPECT_EQ(encoded_size, expected_byte_size);
+
+ WidenInternalType result;
+ int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
+ buffer, buffer + expected_byte_size, expected_byte_size, &result);
+ EXPECT_EQ(decoded_size, expected_byte_size);
+ EXPECT_EQ(v, result);
+
+ TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>(
+ v, expected_byte_size);
+}
+
+TEST(PlainEncoding, Basic) {
+ int8_t i8 = 12;
+ int16_t i16 = 123;
+ int32_t i32 = 1234;
+ int64_t i64 = 12345;
+ float f = 1.23;
+ double d = 1.23456;
+ StringValue sv("Hello");
+ TimestampValue tv;
+
+ TestType<int8_t, parquet::Type::INT32>(i8, sizeof(int32_t));
+ TestType<int16_t, parquet::Type::INT32>(i16, sizeof(int32_t));
+ TestType<int32_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+ TestType<int64_t, parquet::Type::INT64>(i64, sizeof(int64_t));
+ TestType<float, parquet::Type::FLOAT>(f, sizeof(float));
+ TestType<double, parquet::Type::DOUBLE>(d, sizeof(double));
+ TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + sv.len);
+ TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
+
+ // Test type widening.
+ TestTypeWidening<int32_t, int64_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+ TestTypeWidening<int32_t, double, parquet::Type::INT32>(i32, sizeof(int32_t));
+ TestTypeWidening<float, double, parquet::Type::FLOAT>(f, sizeof(float));
+
+ int test_val = 1234;
+ int var_len_decimal_size = sizeof(int32_t)
+ + 2 /*min bytes required for storing test_val*/;
+ // Decimal4Value: General test case
+ TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val),
+ var_len_decimal_size);
+ TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val * -1),
+ var_len_decimal_size);
+ TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(test_val),
+ sizeof(Decimal4Value));
+ TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal4Value(test_val * -1), sizeof(Decimal4Value));
+ TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val),
+ sizeof(int32_t));
+ TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(test_val * -1),
+ sizeof(int32_t));
+
+ // Decimal8Value: General test case
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val),
+ var_len_decimal_size);
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val * -1),
+ var_len_decimal_size);
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(test_val),
+ sizeof(Decimal8Value));
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal8Value(test_val * -1), sizeof(Decimal8Value));
+ TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val),
+ sizeof(int64_t));
+ TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(test_val * -1),
+ sizeof(int64_t));
+
+ // Decimal16Value: General test case
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val),
+ var_len_decimal_size);
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val * -1),
+ var_len_decimal_size);
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( Decimal16Value(test_val),
+ sizeof(Decimal16Value));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal16Value(test_val * -1), sizeof(Decimal16Value));
+
+ // Decimal8Value: int32 limits test
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+ Decimal8Value(std::numeric_limits<int32_t>::max()),
+ sizeof(int32_t) + sizeof(int32_t));
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+ Decimal8Value(std::numeric_limits<int32_t>::min()),
+ sizeof(int32_t) + sizeof(int32_t));
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
+ TestType<Decimal8Value, parquet::Type::INT64>(
+ Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(int64_t));
+ TestType<Decimal8Value, parquet::Type::INT64>(
+ Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(int64_t));
+
+ // Decimal16Value: int32 limits test
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int32_t>::max()),
+ sizeof(int32_t) + sizeof(int32_t));
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int32_t>::min()),
+ sizeof(int32_t) + sizeof(int32_t));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
+
+ // Decimal16Value: int64 limits test
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int64_t>::max()),
+ sizeof(int32_t) + sizeof(int64_t));
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int64_t>::min()),
+ sizeof(int32_t) + sizeof(int64_t));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+ Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
+
+ // two digit values can be encoded with any byte size.
+ for (int i = 1; i <=16; ++i) {
+ if (i <= 4) {
+ TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(i),
+ i + sizeof(int32_t));
+ TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(-i),
+ i + sizeof(int32_t));
+ TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(i), i);
+ TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(-i), i);
+ TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(i), sizeof(int32_t));
+ TestType<Decimal4Value, parquet::Type::INT32>(Decimal4Value(-i), sizeof(int32_t));
+ }
+ if (i <= 8) {
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(i),
+ i + sizeof(int32_t));
+ TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(-i),
+ i + sizeof(int32_t));
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(i), i);
+ TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(-i), i);
+ TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(i), sizeof(int64_t));
+ TestType<Decimal8Value, parquet::Type::INT64>(Decimal8Value(-i), sizeof(int64_t));
+ }
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(i),
+ i + sizeof(int32_t));
+ TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(-i),
+ i + sizeof(int32_t));
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(i), i);
+ TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(-i), i);
+ }
+}
+
+TEST(PlainEncoding, DecimalBigEndian) {
+ // Test Basic can pass if we make the same error in encode and decode.
+ // Verify the bytes are actually big endian.
+ uint8_t buffer[] = {
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
+ };
+
+ // Manually generate this to avoid potential bugs in BitUtil
+ uint8_t buffer_swapped[] = {
+ 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
+ };
+ uint8_t result_buffer[16];
+
+ Decimal4Value d4;
+ Decimal8Value d8;
+ Decimal16Value d16;
+
+ memcpy(&d4, buffer, sizeof(d4));
+ memcpy(&d8, buffer, sizeof(d8));
+ memcpy(&d16, buffer, sizeof(d16));
+
+ int size = ParquetPlainEncoder::Encode(d4, sizeof(d4), result_buffer);
+ ASSERT_EQ(size, sizeof(d4));
+ ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d4), sizeof(d4)), 0);
+
+ size = ParquetPlainEncoder::Encode(d8, sizeof(d8), result_buffer);
+ ASSERT_EQ(size, sizeof(d8));
+ ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d8), sizeof(d8)), 0);
+
+ size = ParquetPlainEncoder::Encode(d16, sizeof(d16), result_buffer);
+ ASSERT_EQ(size, sizeof(d16));
+ ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d16), sizeof(d16)), 0);
+}
+
+/// Test that corrupt strings are handled correctly.
+TEST(PlainEncoding, CorruptString) {
+ // Test string with negative length.
+ uint8_t buffer[sizeof(int32_t) + 10];
+ int32_t len = -10;
+ memcpy(buffer, &len, sizeof(int32_t));
+
+ StringValue result;
+ int decoded_size = ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+ buffer, buffer + sizeof(buffer), 0, &result);
+ EXPECT_EQ(decoded_size, -1);
+}
+
+}
+
+IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-scratch-tuple-batch.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-scratch-tuple-batch.h b/be/src/exec/parquet/parquet-scratch-tuple-batch.h
new file mode 100644
index 0000000..1b79be1
--- /dev/null
+++ b/be/src/exec/parquet/parquet-scratch-tuple-batch.h
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+
+#include "runtime/descriptors.h"
+#include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
+
+namespace impala {
+
+/// Helper struct that holds a batch of tuples allocated from a mem pool, as well
+/// as state associated with iterating over its tuples and transferring
+/// them to an output batch in TransferScratchTuples().
+struct ScratchTupleBatch {
+ // Memory for the fixed-length parts of the batch of tuples. Allocated from
+ // 'tuple_mem_pool'. Set to NULL when transferred to an output batch.
+ uint8_t* tuple_mem = nullptr;
+ // Number of tuples that can be stored in 'tuple_mem'.
+ int capacity;
+ // Keeps track of the current tuple index.
+ int tuple_idx = 0;
+ // Number of valid tuples in tuple_mem.
+ int num_tuples = 0;
+ // Number of tuples transferred to output batches (i.e. not filtered by predicates).
+ // num_tuples_transferred > 0 before a call to FinalizeTupleTransfer() implies that
+ // tuples from the current scratch batch were transferred to a previous output batch.
+ int num_tuples_transferred = 0;
+ // Bytes of fixed-length data per tuple.
+ const int tuple_byte_size;
+
+ // Pool used to allocate 'tuple_mem' and nothing else.
+ MemPool tuple_mem_pool;
+
+ // Pool used to accumulate other memory that may be referenced by var-len slots in this
+ // batch, e.g. decompression buffers, allocations for var-len strings and allocations
+ // for nested arrays. This memory may be referenced by previous batches or the current
+ // batch, but not by future batches. E.g. a decompression buffer can be safely attached
+ // only once all values referencing that buffer have been materialized into the batch.
+ MemPool aux_mem_pool;
+
+ // Tuples transferred to an output row batch are compacted if
+ // (# tuples materialized / # tuples returned) exceeds this number. Chosen so that the
+ // cost of copying the tuples should be very small in relation to the original cost of
+ // materialising them.
+ const int MIN_SELECTIVITY_TO_COMPACT = 16;
+
+ ScratchTupleBatch(
+ const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
+ : capacity(batch_size),
+ tuple_byte_size(row_desc.GetRowSize()),
+ tuple_mem_pool(mem_tracker),
+ aux_mem_pool(mem_tracker) {
+ DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
+ }
+
+ Status Reset(RuntimeState* state) {
+ tuple_idx = 0;
+ num_tuples = 0;
+ num_tuples_transferred = 0;
+ if (tuple_mem == nullptr) {
+ int64_t dummy;
+ RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(
+ state, &tuple_mem_pool, tuple_byte_size, &capacity, &dummy, &tuple_mem));
+ }
+ return Status::OK();
+ }
+
+ /// Release all memory in the MemPools. If 'dst_pool' is non-NULL, transfers it to
+ /// 'dst_pool'. Otherwise frees the memory.
+ void ReleaseResources(MemPool* dst_pool) {
+ if (dst_pool == nullptr) {
+ tuple_mem_pool.FreeAll();
+ aux_mem_pool.FreeAll();
+ } else {
+ dst_pool->AcquireData(&tuple_mem_pool, false);
+ dst_pool->AcquireData(&aux_mem_pool, false);
+ }
+ tuple_mem = nullptr;
+ }
+
+ /// Finalize transfer of 'num_to_commit' tuples to 'dst_batch' and transfer memory to
+ /// 'dst_batch' if at the end of 'scratch_batch'. The tuples must not yet be
+ /// committed to 'dst_batch'. Only needs to be called when materialising non-empty
+ /// tuples.
+ void FinalizeTupleTransfer(RowBatch* dst_batch, int num_to_commit) {
+ DCHECK_GE(num_to_commit, 0);
+ DCHECK_LE(dst_batch->num_rows() + num_to_commit, dst_batch->capacity());
+ DCHECK_LE(num_tuples_transferred + num_to_commit, num_tuples);
+ DCHECK(tuple_mem != nullptr);
+ num_tuples_transferred += num_to_commit;
+ if (!AtEnd()) return;
+ // We're at the end of the scratch batch. Transfer memory that may be referenced by
+ // transferred tuples or that we can't reuse to 'dst_batch'.
+
+ // Future tuples won't reference data in 'aux_mem_pool' - always transfer so that
+ // we don't accumulate unneeded memory in the scratch batch.
+ dst_batch->tuple_data_pool()->AcquireData(&aux_mem_pool, false);
+
+ // Try to avoid the transfer of 'tuple_mem' for selective scans by compacting the
+ // output batch. This avoids excessive allocation and transfer of memory, which
+ // can lead to performance problems like IMPALA-4923.
+ // Compaction is unsafe if the scratch batch was split across multiple output batches
+ // because the batch we returned earlier may hold a reference into 'tuple_mem'.
+ if (num_tuples_transferred > num_to_commit
+ || num_tuples_transferred * MIN_SELECTIVITY_TO_COMPACT > num_tuples
+ || !TryCompact(dst_batch, num_to_commit)) {
+ // Didn't compact - rows in 'dst_batch' reference 'tuple_mem'.
+ dst_batch->tuple_data_pool()->AcquireData(&tuple_mem_pool, false);
+ tuple_mem = nullptr;
+ }
+ }
+
+ /// Try to compact 'num_uncommitted_tuples' uncommitted tuples that were added to
+ /// the end of 'dst_batch' by copying them to memory allocated from
+ /// dst_batch->tuple_data_pool(). Returns true on success or false if the memory
+ /// could not be allocated.
+ bool TryCompact(RowBatch* dst_batch, int num_uncommitted_tuples) {
+ DCHECK_LE(dst_batch->num_rows() + num_uncommitted_tuples, dst_batch->capacity());
+ // Copy rows that reference 'tuple_mem' into a new small buffer. This code handles
+ // the case where num_uncommitted_tuples == 0, since TryAllocate() returns a non-null
+ // pointer.
+ int64_t dst_bytes = num_uncommitted_tuples * static_cast<int64_t>(tuple_byte_size);
+ uint8_t* dst_buffer = dst_batch->tuple_data_pool()->TryAllocate(dst_bytes);
+ if (dst_buffer == nullptr) return false;
+ const int end_row = dst_batch->num_rows() + num_uncommitted_tuples;
+ for (int i = dst_batch->num_rows(); i < end_row; ++i) {
+ TupleRow* row = dst_batch->GetRow(i);
+ Tuple* uncompacted_tuple = row->GetTuple(0);
+ DCHECK_GE(reinterpret_cast<uint8_t*>(uncompacted_tuple), tuple_mem);
+ DCHECK_LT(reinterpret_cast<uint8_t*>(uncompacted_tuple),
+ tuple_mem + tuple_byte_size * capacity);
+ row->SetTuple(0, reinterpret_cast<Tuple*>(dst_buffer));
+ memcpy(dst_buffer, uncompacted_tuple, tuple_byte_size);
+ dst_buffer += tuple_byte_size;
+ }
+ return true;
+ }
+
+ Tuple* GetTuple(int tuple_idx) const {
+ return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
+ }
+
+ uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
+ uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
+ bool AtEnd() const { return tuple_idx == num_tuples; }
+ int64_t total_allocated_bytes() const {
+ return tuple_mem_pool.total_allocated_bytes() + aux_mem_pool.total_allocated_bytes();
+ }
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-version-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-version-test.cc b/be/src/exec/parquet/parquet-version-test.cc
new file mode 100644
index 0000000..5eaa692
--- /dev/null
+++ b/be/src/exec/parquet/parquet-version-test.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+
+#include "exec/parquet/parquet-metadata-utils.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+void CheckVersionParse(const string& s, const string& expected_application,
+ int expected_major, int expected_minor, int expected_patch,
+ bool expected_is_internal) {
+ ParquetFileVersion v(s);
+ EXPECT_EQ(v.application, expected_application) << "String: " << s;
+ EXPECT_EQ(v.version.major, expected_major) << "String: " << s;
+ EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s;
+ EXPECT_EQ(v.version.patch, expected_patch) << "String: " << s;
+ EXPECT_EQ(v.is_impala_internal, expected_is_internal);
+}
+
+TEST(ParquetVersionTest, Parsing) {
+ CheckVersionParse("impala version 1.0", "impala", 1, 0, 0, false);
+ CheckVersionParse("impala VERSION 1.0", "impala", 1, 0, 0, false);
+ CheckVersionParse("impala VERSION 1.0 ignored", "impala", 1, 0, 0, false);
+ CheckVersionParse("parquet-mr version 2.0", "parquet-mr", 2, 0, 0, false);
+
+ CheckVersionParse("impala version 1.2", "impala", 1, 2, 0, false);
+ CheckVersionParse("impala version 1.2.3", "impala", 1, 2, 3, false);
+ CheckVersionParse("impala version 1.2.3-cdh4.5", "impala", 1, 2, 3, false);
+ CheckVersionParse("impala version 1.2.3.cdh4.5", "impala", 1, 2, 3, false);
+ CheckVersionParse("impala version 1.2-cdh4.5", "impala", 1, 2, 0, false);
+ CheckVersionParse("impala version 1.2.cdh4.5", "impala", 1, 2, 0, false);
+ CheckVersionParse("impala version 1.2 (build xyz)", "impala", 1, 2, 0, false);
+ CheckVersionParse("impala version cdh4.5", "impala", 0, 0, 0, false);
+
+ // Test internal versions
+ CheckVersionParse("impala version 1.0-internal", "impala", 1, 0, 0, true);
+ CheckVersionParse("impala version 1.23-internal", "impala", 1, 23, 0, true);
+ CheckVersionParse("impala version 2-inTERnal", "impala", 2, 0, 0, true);
+ CheckVersionParse("mr version 1-internal", "mr", 1, 0, 0, false);
+
+ // Test some malformed strings.
+ CheckVersionParse("parquet-mr 2.0", "parquet-mr", 0, 0, 0, false);
+ CheckVersionParse("impala ve 2.0", "impala", 0, 0, 0, false);
+ CheckVersionParse("", "", 0, 0, 0, false);
+}
+
+TEST(ParquetVersionTest, Comparisons) {
+ ParquetFileVersion v("foo version 1.2.3");
+ EXPECT_TRUE(v.VersionEq(1, 2, 3));
+ EXPECT_FALSE(v.VersionEq(1, 2, 4));
+ EXPECT_TRUE(v.VersionLt(3, 2, 1));
+ EXPECT_TRUE(v.VersionLt(1, 2, 4));
+ EXPECT_TRUE(v.VersionLt(2, 0, 0));
+ EXPECT_FALSE(v.VersionLt(0, 0, 0));
+ EXPECT_FALSE(v.VersionLt(1, 2, 3));
+ EXPECT_FALSE(v.VersionLt(1, 2, 2));
+ EXPECT_FALSE(v.VersionLt(0, 4, 4));
+}
+
+}
+
+IMPALA_TEST_MAIN();
+
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index dd70b36..74347cd 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -31,6 +31,7 @@ add_library(Util
backend-gflag-util.cc
benchmark.cc
bitmap.cc
+ bit-packing.cc
bit-util.cc
bloom-filter.cc
bloom-filter-ir.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing-test.cc b/be/src/util/bit-packing-test.cc
index bedf178..f1ec10a 100644
--- a/be/src/util/bit-packing-test.cc
+++ b/be/src/util/bit-packing-test.cc
@@ -21,7 +21,7 @@
#include "testutil/gtest-util.h"
#include "testutil/mem-util.h"
-#include "util/bit-packing.inline.h"
+#include "util/bit-packing.h"
#include "util/bit-stream-utils.inline.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.cc b/be/src/util/bit-packing.cc
new file mode 100644
index 0000000..cb3233d
--- /dev/null
+++ b/be/src/util/bit-packing.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bit-packing.inline.h"
+
+#include "runtime/decimal-value.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
+
+namespace impala {
+
+// Instantiate all of the templated functions needed by the rest of Impala.
+#define INSTANTIATE_UNPACK_VALUES(OUT_TYPE) \
+ template std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues<OUT_TYPE>( \
+ int bit_width, const uint8_t* __restrict__ in, int64_t in_bytes, \
+ int64_t num_values, OUT_TYPE* __restrict__ out);
+
+INSTANTIATE_UNPACK_VALUES(bool);
+INSTANTIATE_UNPACK_VALUES(uint8_t);
+INSTANTIATE_UNPACK_VALUES(uint32_t);
+
+#define INSTANTIATE_UNPACK_AND_DECODE(OUT_TYPE) \
+ template std::pair<const uint8_t*, int64_t> \
+ BitPacking::UnpackAndDecodeValues<OUT_TYPE>(int bit_width, \
+ const uint8_t* __restrict__ in, int64_t in_bytes, OUT_TYPE* __restrict__ dict, \
+ int64_t dict_len, int64_t num_values, OUT_TYPE* __restrict__ out, int64_t stride, \
+ bool* __restrict__ decode_error);
+
+INSTANTIATE_UNPACK_AND_DECODE(bool);
+INSTANTIATE_UNPACK_AND_DECODE(double);
+INSTANTIATE_UNPACK_AND_DECODE(float);
+INSTANTIATE_UNPACK_AND_DECODE(int8_t);
+INSTANTIATE_UNPACK_AND_DECODE(int16_t);
+INSTANTIATE_UNPACK_AND_DECODE(int32_t);
+INSTANTIATE_UNPACK_AND_DECODE(int64_t);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal4Value);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal8Value);
+INSTANTIATE_UNPACK_AND_DECODE(Decimal16Value);
+INSTANTIATE_UNPACK_AND_DECODE(StringValue);
+INSTANTIATE_UNPACK_AND_DECODE(TimestampValue);
+
+// Required for bit-packing-benchmark.cc.
+template
+const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
+ int64_t in_bytes, uint32_t* __restrict__ out);
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 38b39e2..c70b55e 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef IMPALA_UTIL_BIT_PACKING_H
-#define IMPALA_UTIL_BIT_PACKING_H
-
-namespace impala {
+#pragma once
+#include <cstddef>
#include <cstdint>
-
#include <utility>
+namespace impala {
+
/// Utilities for manipulating bit-packed values. Bit-packing is a technique for
/// compressing integer values that do not use the full range of the integer type.
/// E.g. an array of uint32_t values with range [0, 31] only uses the lower 5 bits
@@ -131,5 +130,3 @@ class BitPacking {
static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values);
};
}
-
-#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 6fa31cc..8cebe40 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef IMPALA_UTIL_BIT_PACKING_INLINE_H
-#define IMPALA_UTIL_BIT_PACKING_INLINE_H
+// This contains all the template implementations for functions defined in bit-packing.h.
+// This should be included by files that want to instantiate those templates directly.
+// Including this file is not generally necessary - instead the templates should be
+// instantiated in bit-packing.cc so that compile times stay manageable.
+
+#pragma once
#include "util/bit-packing.h"
@@ -345,6 +349,4 @@ const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict
return in + BYTES_TO_READ;
#pragma pop_macro("DECODE_VALUES_CASE")
}
-}
-
-#endif
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index aa53c52..48f52da 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -21,7 +21,7 @@
#include "util/bit-stream-utils.h"
#include "common/compiler-util.h"
-#include "util/bit-packing.inline.h"
+#include "util/bit-packing.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index bf40301..cc7ef82 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -23,7 +23,7 @@
#include <boost/unordered_map.hpp>
#include "common/compiler-util.h"
-#include "exec/parquet-common.h"
+#include "exec/parquet/parquet-common.h"
#include "gutil/strings/substitute.h"
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index ecf24f5..c30c30b 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -25,6 +25,7 @@
#include "runtime/string-value.inline.h"
#include "runtime/timestamp-value.h"
#include "testutil/gtest-util.h"
+#include "util/bit-packing.inline.h"
#include "util/dict-encoding.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index a50838c..8bb7665 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -35,7 +35,7 @@
#include <thrift/transport/TBufferTransports.h>
#pragma clang diagnostic pop
-#include "exec/parquet-common.h"
+#include "exec/parquet/parquet-common.h"
#include "runtime/mem-pool.h"
#include "util/codec.h"
#include "util/rle-encoding.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index c52659b..4406e46 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -23,8 +23,9 @@
#include <math.h>
#include "testutil/gtest-util.h"
-#include "util/rle-encoding.h"
+#include "util/bit-packing.inline.h"
#include "util/bit-stream-utils.inline.h"
+#include "util/rle-encoding.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 768a569..d8d40b9 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -370,6 +370,9 @@ error_codes = (
("PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY", 121,
"Parquet file '$0' column '$1' contains a timestamp with invalid time of day. "
"The time of day should be 0 <= and < 24 hour (in nanoseconds)."),
+
+ ("PARQUET_CORRUPT_BOOL_VALUE", 122, "File '$0' is corrupt: error decoding BOOLEAN "
+ "value with encoding $1 at offset $2"),
)
import sys
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
index 47fbc1a..3feb7c2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
@@ -2,5 +2,5 @@
---- QUERY
select * from num_values_def_levels_mismatch
---- CATCH
-could not read all def levels for column '_c0'
+Could not read definition level, even though metadata states there are 1 values remaining in data page.
====