You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/09/26 17:57:49 UTC
[impala] branch master updated: IMPALA-6433: Part 1: Extract page
reading logic from ParquetColumnReader
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new b7c081c IMPALA-6433: Part 1: Extract page reading logic from ParquetColumnReader
b7c081c is described below
commit b7c081cb9966166be8d20d6b75a541394a001ddc
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Tue May 14 12:00:39 2019 +0200
IMPALA-6433: Part 1: Extract page reading logic from ParquetColumnReader
Moved some responsibilities from parquet-column-readers.cc to
new classes 'ParquetColumnChunkReader' and 'ParquetPageReader':
- reading pages from ScanRange
- decompress data if needed
The main motivation is to make the implementation of V2 data page
reading simpler by moving most parts that will differ between V1 and V2
into a class with manageable complexity.
Testing:
- ran parquet related scanner tests
Change-Id: Ic0289394adcb97a3529313030930c9c5b85aaa12
Reviewed-on: http://gerrit.cloudera.org:8080/13329
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
be/src/exec/parquet/CMakeLists.txt | 2 +
be/src/exec/parquet/hdfs-parquet-scanner.h | 3 +
be/src/exec/parquet/parquet-column-chunk-reader.cc | 308 ++++++++++++
be/src/exec/parquet/parquet-column-chunk-reader.h | 172 +++++++
be/src/exec/parquet/parquet-column-readers.cc | 554 ++++-----------------
be/src/exec/parquet/parquet-column-readers.h | 64 +--
be/src/exec/parquet/parquet-page-reader.cc | 249 +++++++++
be/src/exec/parquet/parquet-page-reader.h | 126 +++++
8 files changed, 973 insertions(+), 505 deletions(-)
diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt
index ec33f93..4589774 100644
--- a/be/src/exec/parquet/CMakeLists.txt
+++ b/be/src/exec/parquet/CMakeLists.txt
@@ -33,6 +33,8 @@ add_library(Parquet
parquet-column-stats.cc
parquet-level-decoder.cc
parquet-metadata-utils.cc
+ parquet-column-chunk-reader.cc
+ parquet-page-reader.cc
parquet-common.cc
parquet-page-index.cc
)
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index ae47cf8..8e5e7bc 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -48,6 +48,7 @@ class BaseScalarColumnReader;
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
class ScalarColumnReader;
class BoolColumnReader;
+class ParquetPageReader;
/// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
/// the Impala in-memory representation of data, e.g. (tuples, rows, row batches).
@@ -376,6 +377,8 @@ class HdfsParquetScanner : public HdfsScanner {
friend class BoolColumnReader;
friend class HdfsParquetScannerTest;
friend class ParquetPageIndex;
+ friend class ParquetColumnChunkReader;
+ friend class ParquetPageReader;
/// Index of the current row group being processed. Initialized to -1 which indicates
/// that we have not started processing the first row group yet (GetNext() has not yet
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc b/be/src/exec/parquet/parquet-column-chunk-reader.cc
new file mode 100644
index 0000000..573fd04
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-column-chunk-reader.h"
+
+#include <string>
+
+#include "runtime/mem-pool.h"
+#include "runtime/runtime-state.h"
+#include "util/codec.h"
+
+#include "common/names.h"
+
+using namespace impala::io;
+
+using parquet::Encoding;
+
+namespace impala {
+
+const string PARQUET_PAGE_MEM_LIMIT_EXCEEDED =
+ "ParquetColumnChunkReader::$0() failed to allocate $1 bytes for $2.";
+
+// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
+// if this matches those versions and compatibility workarounds need to be used.
+static bool RequiresSkippedDictionaryHeaderCheck(
+ const ParquetFileVersion& v) {
+ if (v.application != "impala") return false;
+ return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
+}
+
+ParquetColumnChunkReader::ParquetColumnChunkReader(HdfsParquetScanner* parent,
+ string schema_name, int slot_id, ValueMemoryType value_mem_type)
+ : parent_(parent),
+ schema_name_(schema_name),
+ page_reader_(parent, schema_name),
+ slot_id_(slot_id),
+ data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())),
+ value_mem_type_(value_mem_type)
+{
+}
+
+ParquetColumnChunkReader::~ParquetColumnChunkReader() {}
+
+Status ParquetColumnChunkReader::InitColumnChunk(const HdfsFileDesc& file_desc,
+ const parquet::ColumnChunk& col_chunk, int row_group_idx,
+ std::vector<io::ScanRange::SubRange>&& sub_ranges) {
+ if (col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
+ RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false,
+ ConvertParquetToImpalaCodec(col_chunk.meta_data.codec), &decompressor_));
+ }
+
+ RETURN_IF_ERROR(page_reader_.InitColumnChunk(file_desc, col_chunk,
+ row_group_idx, move(sub_ranges)));
+
+ return Status::OK();
+}
+
+void ParquetColumnChunkReader::Close(MemPool* mem_pool) {
+ if (mem_pool != nullptr && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+ mem_pool->AcquireData(data_page_pool_.get(), false);
+ } else {
+ data_page_pool_->FreeAll();
+ }
+
+ if (decompressor_ != nullptr) decompressor_->Close();
+}
+
+void ParquetColumnChunkReader::ReleaseResourcesOfLastPage(MemPool& mem_pool) {
+ if (value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+ mem_pool.AcquireData(data_page_pool_.get(), false);
+ } else {
+ data_page_pool_->FreeAll();
+ }
+ // We don't hold any pointers to earlier pages in the stream - we can safely free
+ // any I/O or boundary buffer.
+ // TODO: is this really needed? The read in ReadPageHeader() will release these
+ // resources anyway.
+ stream()->ReleaseCompletedResources(false);
+}
+
+Status ParquetColumnChunkReader::StartScan() {
+ DCHECK_GT(io_reservation_, 0);
+ RETURN_IF_ERROR(page_reader_.StartScan(io_reservation_));
+ return Status::OK();
+}
+
+Status ParquetColumnChunkReader::SkipPageData() {
+ return page_reader_.SkipPageData();
+}
+
+Status ParquetColumnChunkReader::TryReadDictionaryPage(bool* is_dictionary_page,
+ bool* eos, bool skip_data, ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+ int64_t* data_size, int* num_entries) {
+ RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
+
+ *is_dictionary_page = CurrentPageHeader().__isset.dictionary_page_header;
+ if (*eos || !(*is_dictionary_page)) return Status::OK();
+
+ if (skip_data) return SkipPageData();
+
+ RETURN_IF_ERROR(ReadDictionaryData(uncompressed_buffer, dict_values,
+ data_size, num_entries));
+
+ return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_buffer,
+ uint8_t** dict_values, int64_t* data_size, int* num_entries) {
+ const parquet::PageHeader& current_page_header = CurrentPageHeader();
+ const parquet::DictionaryPageHeader* dict_header = nullptr;
+ if (current_page_header.__isset.dictionary_page_header) {
+ dict_header = ¤t_page_header.dictionary_page_header;
+ } else {
+ if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+ return Status("Dictionary page does not have dictionary header set.");
+ }
+ }
+ if (dict_header != nullptr &&
+ dict_header->encoding != Encoding::PLAIN &&
+ dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
+ return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
+ "for dictionary pages.");
+ }
+
+ *data_size = current_page_header.compressed_page_size;
+ uint8_t* data = nullptr;
+ RETURN_IF_ERROR(page_reader_.ReadPageData(&data));
+
+ if (current_page_header.uncompressed_page_size == 0) {
+ // The size of dictionary can be 0, if every value is null.
+ *data_size = 0;
+ *num_entries = 0;
+ *dict_values = nullptr;
+
+ return Status::OK();
+ }
+
+ // There are 3 different cases from the aspect of memory management:
+ // 1. If the column type is string, the dictionary will contain pointers to a buffer,
+ // so the buffer's lifetime must be as long as any row batch that references it.
+ // 2. If the column type is not string, and the dictionary page is compressed, then a
+ // temporary buffer is needed for the uncompressed values.
+ // 3. If the column type is not string, and the dictionary page is not compressed,
+ // then no buffer is necessary.
+ const bool copy_buffer = value_mem_type_ == ValueMemoryType::FIXED_LEN_STR
+ || value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
+
+ if (decompressor_.get() != nullptr || copy_buffer) {
+ int buffer_size = current_page_header.uncompressed_page_size;
+ if (copy_buffer) {
+ *dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
+ } else if (uncompressed_buffer->TryAllocate(buffer_size)) {
+ *dict_values = uncompressed_buffer->buffer(); // case 2
+ }
+ if (UNLIKELY(*dict_values == nullptr)) {
+ string details = Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "InitDictionary",
+ buffer_size, "dictionary");
+ return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+ parent_->state_, details, buffer_size);
+ }
+ } else {
+ *dict_values = data; // case 3.
+ }
+
+ if (decompressor_.get() != nullptr) {
+ int uncompressed_size = current_page_header.uncompressed_page_size;
+ RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, *data_size, data,
+ &uncompressed_size, dict_values));
+ VLOG_FILE << "Decompressed " << *data_size << " to " << uncompressed_size;
+ if (current_page_header.uncompressed_page_size != uncompressed_size) {
+ return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+ "Expected $1 uncompressed bytes but got $2", filename(),
+ current_page_header.uncompressed_page_size, uncompressed_size));
+ }
+ *data_size = uncompressed_size;
+ } else {
+ if (current_page_header.uncompressed_page_size != *data_size) {
+ return Status(Substitute("Error reading dictionary page in file '$0'. "
+ "Expected $1 bytes but got $2", filename(),
+ current_page_header.uncompressed_page_size, *data_size));
+ }
+ if (copy_buffer) memcpy(*dict_values, data, *data_size);
+ }
+ *num_entries = dict_header->num_values;
+
+ return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ReadNextDataPage(bool* eos, uint8_t** data,
+ int* data_size) {
+ // Read the next data page, skipping page types we don't care about. This method should
+ // be called after we know that the first page is not a dictionary page. Therefore, if
+ // we find a dictionary page, it is an error in the parquet file and we return a non-ok
+ // status (returned by page_reader_.ReadPageHeader()).
+ bool next_data_page_found = false;
+ while (!next_data_page_found) {
+ RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
+
+ const parquet::PageHeader current_page_header = CurrentPageHeader();
+ DCHECK(page_reader_.PageHeadersRead() > 0
+ || !current_page_header.__isset.dictionary_page_header)
+ << "Should not call this method on the first page if it is a dictionary.";
+
+ if (*eos) return Status::OK();
+
+ if (current_page_header.type == parquet::PageType::DATA_PAGE) {
+ next_data_page_found = true;
+ } else {
+ // We can safely skip non-data pages
+ RETURN_IF_ERROR(SkipPageData());
+ }
+ }
+
+ return ReadDataPageData(data, data_size);
+}
+
+Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size) {
+ const parquet::PageHeader& current_page_header = CurrentPageHeader();
+
+ int compressed_size = current_page_header.compressed_page_size;
+ int uncompressed_size = current_page_header.uncompressed_page_size;
+ uint8_t* compressed_data;
+
+ RETURN_IF_ERROR(page_reader_.ReadPageData(&compressed_data));
+
+ const bool has_slot_desc = value_mem_type_ != ValueMemoryType::NO_SLOT_DESC;
+
+ *data_size = uncompressed_size;
+ if (decompressor_.get() != nullptr) {
+ SCOPED_TIMER(parent_->decompress_timer_);
+ uint8_t* decompressed_buffer;
+ RETURN_IF_ERROR(AllocateUncompressedDataPage(
+ uncompressed_size, "decompressed data", &decompressed_buffer));
+ RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+ compressed_size, compressed_data, &uncompressed_size,
+ &decompressed_buffer));
+ // TODO: can't we call stream_->ReleaseCompletedResources(false); at this point?
+ VLOG_FILE << "Decompressed " << current_page_header.compressed_page_size
+ << " to " << uncompressed_size;
+ if (current_page_header.uncompressed_page_size != uncompressed_size) {
+ return Status(Substitute("Error decompressing data page in file '$0'. "
+ "Expected $1 uncompressed bytes but got $2", filename(),
+ current_page_header.uncompressed_page_size, uncompressed_size));
+ }
+ *data = decompressed_buffer;
+
+ if (has_slot_desc) {
+ parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, compressed_size);
+ parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+ parent_->UpdateCompressedPageSizeCounter(compressed_size);
+ }
+ } else {
+ if (compressed_size != uncompressed_size) {
+ return Status(Substitute("Error reading data page in file '$0'. "
+ "Expected $1 bytes but got $2", filename(),
+ compressed_size, uncompressed_size));
+ }
+
+ const bool copy_buffer = value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
+
+ if (copy_buffer) {
+ // In this case returned batches will have pointers into the data page itself.
+ // We don't transfer disk I/O buffers out of the scanner so we need to copy
+ // the page data so that it can be attached to output batches.
+ uint8_t* buffer;
+ RETURN_IF_ERROR(AllocateUncompressedDataPage(
+ uncompressed_size, "uncompressed variable-length data", &buffer));
+ memcpy(buffer, compressed_data, uncompressed_size);
+ *data = buffer;
+ } else {
+ *data = compressed_data;
+ }
+ if (has_slot_desc) {
+ parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, 0);
+ parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status ParquetColumnChunkReader::AllocateUncompressedDataPage(int64_t size,
+ const char* err_ctx, uint8_t** buffer) {
+ *buffer = data_page_pool_->TryAllocate(size);
+ if (*buffer == nullptr) {
+ string details =
+ Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
+ return data_page_pool_->mem_tracker()->MemLimitExceeded(
+ parent_->state_, details, size);
+ }
+ return Status::OK();
+}
+
+}
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.h b/be/src/exec/parquet/parquet-column-chunk-reader.h
new file mode 100644
index 0000000..2d685bb
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+#include "exec/parquet/parquet-page-reader.h"
+
+namespace impala {
+
+class MemPool;
+class Codec;
+
+/// A class to read data from Parquet pages. It handles the page headers, decompression
+/// and the possible copying of the data buffers.
+/// Before reading, InitColumnChunk(), set_io_reservation() and StartScan() must be called
+/// in this order.
+class ParquetColumnChunkReader {
+ public:
+
+ /// An enum containing information about the type and/or intended use of the values. It
+ /// is used to make decisions about memory management, for example when a buffer needs
+ /// to be copied.
+ /// In the future, more variants could be added if a new use case needs different memory
+ /// management.
+ enum class ValueMemoryType {
+ /// The values will not be read.
+ NO_SLOT_DESC,
+
+ /// Scalar (non-string) values.
+ SCALAR,
+ FIXED_LEN_STR,
+ VAR_LEN_STR
+ };
+
+ const char* filename() const { return parent_->filename(); }
+
+ const parquet::PageHeader& CurrentPageHeader() const {
+ return page_reader_.CurrentPageHeader();
+ }
+
+ io::ScanRange* scan_range() const { return page_reader_.scan_range(); }
+ parquet::PageType::type page_type() const { return CurrentPageHeader().type; }
+ ScannerContext::Stream* stream() const { return page_reader_.stream(); }
+
+ parquet::Encoding::type encoding() const {
+ return CurrentPageHeader().data_page_header.encoding;
+ }
+
+ /// Moved to implementation to be able to forward declare class in scoped_ptr.
+ ParquetColumnChunkReader(HdfsParquetScanner* parent, string schema_name, int slot_id,
+ ValueMemoryType value_mem_type);
+ ~ParquetColumnChunkReader();
+
+ /// Resets the reader for each row group in the file and creates the scan
+ /// range for the column, but does not start it. To start scanning,
+ /// set_io_reservation() must be called to assign reservation to this
+ /// column, followed by StartScan().
+ Status InitColumnChunk(
+ const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
+ int row_group_idx, std::vector<io::ScanRange::SubRange>&& sub_ranges);
+
+ void set_io_reservation(int bytes) {
+ io_reservation_ = bytes;
+ }
+
+ /// Starts the column scan range. InitColumnChunk() has to have been called and the
+ /// reader must have a reservation assigned via set_io_reservation(). This must be
+ /// called before any of the column data can be read (including dictionary and data
+ /// pages). Returns an error status if there was an error starting the scan or
+ /// allocating buffers for it.
+ Status StartScan();
+
+ /// If the column type is a variable length string and 'mem_pool' is not NULL, transfers
+ /// the remaining resources backing tuples to 'mem_pool' and frees up other resources.
+ /// Otherwise frees all resources.
+ void Close(MemPool* mem_pool);
+
+ /// The following functions can all advance stream_, which invalidates the buffer
+ /// returned by the previous call (unless copy_buffer was true).
+
+ /// Checks whether the next page is a dictionary page and if it is, reads the header and
+ /// either reads or skips the dictionary data, depending on 'skip_data'.
+ ///
+ /// After this method returns, the value of '*is_dictionary' can be used to determine
+ /// whether the page was a dictionary page.
+ /// If the data is read, '*dict_values' is set to point to the data and '*data_size' is
+ /// set to the length of the data. '*num_entries' is set to the number of elements. If
+ /// the column type is a string, then the buffer is allocated from the scanner's
+ /// dictionary_pool_ and will be valid as long as the scanner lives. Otherwise the
+ /// returned buffer will be valid only until the next function call that advances the
+ /// buffer.
+ /// 'uncompressed_buffer' is used to store data if a temporary buffer is needed for
+ /// decompression.
+ Status TryReadDictionaryPage(bool* is_dictionary_page, bool* eos, bool skip_data,
+ ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+ int64_t* data_size, int* num_entries);
+
+ /// Reads the next data page to '*data' and '*data_size'.
+ /// Skips other types of pages (except for dictionary) until it finds a data page. If it
+ /// finds a dictionary page, returns an error as the dictionary page should be the first
+ /// page and this method should only be called if a data page is expected.
+ /// If the stream reaches the end before reading a complete page header, '*eos' is set
+ /// to true.
+ Status ReadNextDataPage(bool* eos, uint8_t** data, int* data_size);
+
+ /// If the column type is a variable length string, transfers the remaining resources
+ /// backing tuples to 'mem_pool' and frees up other resources. Otherwise frees all
+ /// resources.
+ void ReleaseResourcesOfLastPage(MemPool& mem_pool);
+
+ private:
+ HdfsParquetScanner* parent_;
+ string schema_name_;
+
+ ParquetPageReader page_reader_;
+
+ /// Used to track reads in the scanners counters.
+ int slot_id_;
+
+ /// Pool to allocate storage for data pages from - either decompression buffers for
+ /// compressed data pages or copies of the data page with var-len data to attach to
+ /// batches.
+ boost::scoped_ptr<MemPool> data_page_pool_;
+
+ /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
+ /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
+ /// by Reset().
+ int64_t io_reservation_ = 0;
+
+ boost::scoped_ptr<Codec> decompressor_;
+
+ /// See TryReadDictionaryPage() for information about the parameters.
+ Status ReadDictionaryData(ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
+ int64_t* data_size, int* num_entries);
+
+ /// Reads the data part of the next data page. Sets '*data' to point to the buffer and
+ /// '*data_size' to its size.
+ /// If the column type is a variable length string, the buffer is allocated from
+ /// data_page_pool_. Otherwise the returned buffer will be valid only until the next
+ /// function call that advances the buffer.
+ Status ReadDataPageData(uint8_t** data, int* data_size);
+
+ /// Skips the data part of the page. The header must be already read.
+ Status SkipPageData();
+
+ /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
+ /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success,
+ /// 'buffer' points to the allocated memory. Otherwise an error status is returned.
+ Status AllocateUncompressedDataPage(
+ int64_t size, const char* err_ctx, uint8_t** buffer);
+
+ ValueMemoryType value_mem_type_;
+};
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index f044b86..ff237a9 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -17,34 +17,19 @@
#include "parquet-column-readers.h"
-#include <sstream>
#include <string>
-#include <boost/scoped_ptr.hpp>
-#include <gflags/gflags.h>
#include <gutil/strings/substitute.h>
#include "exec/parquet/hdfs-parquet-scanner.h"
#include "exec/parquet/parquet-bool-decoder.h"
#include "exec/parquet/parquet-level-decoder.h"
#include "exec/parquet/parquet-metadata-utils.h"
-#include "exec/parquet/parquet-scratch-tuple-batch.h"
-#include "exec/read-write-util.h"
-#include "exec/scanner-context.inline.h"
#include "parquet-collection-column-reader.h"
-#include "rpc/thrift-util.h"
-#include "runtime/exec-env.h"
-#include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
#include "runtime/runtime-state.h"
-#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
-#include "util/bit-util.h"
-#include "util/codec.h"
#include "util/debug-util.h"
#include "util/dict-encoding.h"
-#include "util/pretty-printer.h"
#include "util/rle-encoding.h"
-#include "util/ubsan.h"
#include "common/names.h"
@@ -53,25 +38,12 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected.");
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
-// Max data page header size in bytes. This is an estimate and only needs to be an upper
-// bound. It is theoretically possible to have a page header of any size due to string
-// value statistics, but in practice we'll have trouble reading string values this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
-
using namespace impala::io;
using parquet::Encoding;
namespace impala {
-const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
- "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
-
// Definition of variable declared in header for use of the
// SHOULD_TRIGGER_COL_READER_DEBUG_ACTION macro.
int parquet_column_reader_debug_count = 0;
@@ -268,20 +240,19 @@ class ScalarColumnReader : public BaseScalarColumnReader {
/// Pull out slow-path Status construction code
void __attribute__((noinline)) SetDictDecodeError() {
parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
- slot_desc_->type().DebugString(), stream_->file_offset());
+ slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset());
}
void __attribute__((noinline)) SetPlainDecodeError() {
parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
- slot_desc_->type().DebugString(), stream_->file_offset());
+ slot_desc_->type().DebugString(), col_chunk_reader_.stream()->file_offset());
}
void __attribute__((noinline)) SetBoolDecodeError() {
parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_BOOL_VALUE, filename(),
- PrintThriftEnum(page_encoding_), stream_->file_offset());
+ PrintThriftEnum(page_encoding_), col_chunk_reader_.stream()->file_offset());
}
-
/// Dictionary decoder for decoding column values.
DictDecoder<InternalType> dict_decoder_;
@@ -348,7 +319,7 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag
DCHECK_GE(size, 0);
DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
<< "Bool has specialized impl";
- page_encoding_ = current_page_header_.data_page_header.encoding;
+ page_encoding_ = col_chunk_reader_.encoding();
if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
&& page_encoding_ != parquet::Encoding::PLAIN) {
return GetUnsupportedDecodingError();
@@ -381,7 +352,7 @@ Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataPage(
uint8_t* data, int size) {
// Data can be empty if the column contains all NULLs
DCHECK_GE(size, 0);
- page_encoding_ = current_page_header_.data_page_header.encoding;
+ page_encoding_ = col_chunk_reader_.encoding();
/// Boolean decoding is delegated to 'bool_decoder_'.
if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK();
@@ -1040,14 +1011,6 @@ bool ScalarColumnReader<DateValue, parquet::Type::INT32, true>::ValidateValue(
return true;
}
-// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true
-// if this matches those versions and compatibility workarounds need to be used.
-static bool RequiresSkippedDictionaryHeaderCheck(
- const ParquetFileVersion& v) {
- if (v.application != "impala") return false;
- return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
void BaseScalarColumnReader::CreateSubRanges(vector<ScanRange::SubRange>* sub_ranges) {
sub_ranges->clear();
if (!DoesPageFiltering()) return;
@@ -1078,10 +1041,9 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
parent_->filename(), row_group_idx, col_idx(), schema_element(),
parent_->state_));
num_buffered_values_ = 0;
+
data_ = nullptr;
data_end_ = nullptr;
- stream_ = nullptr;
- io_reservation_ = 0;
metadata_ = &col_chunk.meta_data;
num_values_read_ = 0;
def_level_ = ParquetLevel::INVALID_LEVEL;
@@ -1089,292 +1051,62 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL;
pos_current_value_ = ParquetLevel::INVALID_POS;
- if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
- RETURN_IF_ERROR(Codec::CreateDecompressor(
- nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
- }
- int64_t col_start = col_chunk.meta_data.data_page_offset;
- if (col_chunk.meta_data.__isset.dictionary_page_offset) {
- // Already validated in ValidateColumnOffsets()
- DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
- col_start = col_chunk.meta_data.dictionary_page_offset;
- }
- int64_t col_len = col_chunk.meta_data.total_compressed_size;
- if (col_len <= 0) {
- return Status(Substitute("File '$0' contains invalid column chunk size: $1",
- filename(), col_len));
- }
- int64_t col_end = col_start + col_len;
-
- // Already validated in ValidateColumnOffsets()
- DCHECK_GT(col_end, 0);
- DCHECK_LT(col_end, file_desc.file_length);
- const ParquetFileVersion& file_version = parent_->file_version_;
- if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
- // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
- // dictionary page header size in total_compressed_size and total_uncompressed_size
- // (see IMPALA-694). We pad col_len to compensate.
- int64_t bytes_remaining = file_desc.file_length - col_end;
- int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
- col_len += pad;
- col_end += pad;
- }
-
- // TODO: this will need to change when we have co-located files and the columns
- // are different files.
- if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
- return Status(Substitute("Expected parquet column file path '$0' to match "
- "filename '$1'", col_chunk.file_path, filename()));
- }
-
- const ScanRange* metadata_range = parent_->metadata_range_;
- int64_t partition_id = parent_->context_->partition_descriptor()->id();
- const ScanRange* split_range =
- static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
- // Determine if the column is completely contained within a local split.
- bool col_range_local = split_range->expected_local()
- && col_start >= split_range->offset()
- && col_end <= split_range->offset() + split_range->len();
vector<ScanRange::SubRange> sub_ranges;
CreateSubRanges(&sub_ranges);
- scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
- filename(), col_len, col_start, move(sub_ranges), partition_id,
- split_range->disk_id(),col_range_local, split_range->is_erasure_coded(),
- file_desc.mtime, BufferOpts(split_range->cache_options()));
+
+ RETURN_IF_ERROR(col_chunk_reader_.InitColumnChunk(
+ file_desc, col_chunk, row_group_idx, move(sub_ranges)));
+
ClearDictionaryDecoder();
return Status::OK();
}
void BaseScalarColumnReader::Close(RowBatch* row_batch) {
- if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) {
- row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false);
- } else {
- data_page_pool_->FreeAll();
- }
- if (decompressor_ != nullptr) decompressor_->Close();
+ col_chunk_reader_.Close(row_batch == nullptr ? nullptr : row_batch->tuple_data_pool());
DictDecoderBase* dict_decoder = GetDictionaryDecoder();
if (dict_decoder != nullptr) dict_decoder->Close();
}
-Status BaseScalarColumnReader::StartScan() {
- DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- ScannerContext* context = parent_->context_;
- DCHECK_GT(io_reservation_, 0);
- bool needs_buffers;
- RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
- scan_range_, &needs_buffers));
- if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- context->bp_client(), scan_range_, io_reservation_));
- }
- stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
- DCHECK(stream_ != nullptr);
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadPageHeader(bool peek,
- parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
- DCHECK(stream_ != nullptr);
- *eos = false;
-
- uint8_t* buffer;
- int64_t buffer_size;
- RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
- // check for end of stream
- if (buffer_size == 0) {
- // The data pages contain fewer values than stated in the column metadata.
- DCHECK(stream_->eosr());
- DCHECK_LT(num_values_read_, metadata_->num_values);
- // TODO for 2.3: node_.element->name isn't necessarily useful
- ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
- num_values_read_, node_.element->name, filename());
- RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
- *eos = true;
- return Status::OK();
- }
-
- // We don't know the actual header size until the thrift object is deserialized. Loop
- // until we successfully deserialize the header or exceed the maximum header size.
- uint32_t header_size;
- Status status;
- while (true) {
- header_size = buffer_size;
- status = DeserializeThriftMsg(buffer, &header_size, true, next_page_header);
- if (status.ok()) break;
-
- if (buffer_size >= FLAGS_max_page_header_size) {
- stringstream ss;
- ss << "ParquetScanner: could not read data page because page header exceeded "
- << "maximum size of "
- << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
- status.AddDetail(ss.str());
- return status;
- }
-
- // Didn't read entire header, increase buffer size and try again
- int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
- status = Status::OK();
- bool success = stream_->GetBytes(
- new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
- if (!success) {
- DCHECK(!status.ok());
- return status;
- }
- DCHECK(status.ok());
-
- // Even though we increased the allowed buffer size, the number of bytes
- // read did not change. The header is not limited by the buffer space,
- // so it must be incomplete in the file.
- if (buffer_size == new_buffer_size) {
- DCHECK_NE(new_buffer_size, 0);
- return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
- }
- DCHECK_GT(new_buffer_size, buffer_size);
- buffer_size = new_buffer_size;
- }
-
- *next_header_size = header_size;
-
- // Successfully deserialized current_page_header_
- if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
-
- int data_size = next_page_header->compressed_page_size;
- if (UNLIKELY(data_size < 0)) {
- return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
- "column '$2'", filename(), data_size, schema_element().name));
- }
- int uncompressed_size = next_page_header->uncompressed_page_size;
- if (UNLIKELY(uncompressed_size < 0)) {
- return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
- "size $1 for column '$2'", filename(), uncompressed_size,
- schema_element().name));
- }
-
- return Status::OK();
-}
-
Status BaseScalarColumnReader::InitDictionary() {
- // Peek at the next page header
- bool eos;
- parquet::PageHeader next_page_header;
- uint32_t next_header_size;
- DCHECK(stream_ != nullptr);
- DCHECK(!HasDictionaryDecoder());
-
- RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
- &next_header_size, &eos));
- if (eos) return Status::OK();
- // The dictionary must be the first data page, so if the first page
- // is not a dictionary, then there is no dictionary.
- if (Ubsan::EnumToInt(&next_page_header.type) != parquet::PageType::DICTIONARY_PAGE) {
- return Status::OK();
- }
-
- current_page_header_ = next_page_header;
- Status status;
- if (!stream_->SkipBytes(next_header_size, &status)) return status;
-
- int data_size = current_page_header_.compressed_page_size;
- if (slot_desc_ == nullptr) {
- // Skip processing the dictionary page if we don't need to decode any values. In
- // addition to being unnecessary, we are likely unable to successfully decode the
- // dictionary values because we don't necessarily create the right type of scalar
- // reader if there's no slot to read into (see CreateReader()).
- if (!stream_->SkipBytes(data_size, &status)) return status;
- return Status::OK();
- }
+ // Dictionary encoding is not supported for booleans.
+ const bool is_boolean = node_.element->type == parquet::Type::BOOLEAN;
+ const bool skip_data = slot_desc_ == nullptr || is_boolean;
- if (node_.element->type == parquet::Type::BOOLEAN) {
+ // TODO: maybe avoid malloc on every page?
+ ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
+ bool eos;
+ bool is_dictionary_page;
+ int64_t data_size;
+ int num_entries;
+ RETURN_IF_ERROR(col_chunk_reader_.TryReadDictionaryPage(&is_dictionary_page, &eos,
+ skip_data, &uncompressed_buffer, &data_, &data_size, &num_entries));
+ if (eos) return HandleTooEarlyEos();
+ if (is_dictionary_page && is_boolean) {
return Status("Unexpected dictionary page. Dictionary page is not"
- " supported for booleans.");
+ " supported for booleans.");
}
-
- const parquet::DictionaryPageHeader* dict_header = nullptr;
- if (current_page_header_.__isset.dictionary_page_header) {
- dict_header = ¤t_page_header_.dictionary_page_header;
- } else {
- if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
- return Status("Dictionary page does not have dictionary header set.");
- }
- }
- if (dict_header != nullptr &&
- dict_header->encoding != Encoding::PLAIN &&
- dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
- return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
- "for dictionary pages.");
- }
-
- if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
- data_end_ = data_ + data_size;
+ if (!is_dictionary_page || skip_data) return Status::OK();
// The size of dictionary can be 0, if every value is null. The dictionary still has to
// be reset in this case.
DictDecoderBase* dict_decoder;
- if (current_page_header_.uncompressed_page_size == 0) {
+ if (data_size == 0) {
+ data_end_ = data_;
return CreateDictionaryDecoder(nullptr, 0, &dict_decoder);
}
-
- // There are 3 different cases from the aspect of memory management:
- // 1. If the column type is string, the dictionary will contain pointers to a buffer,
- // so the buffer's lifetime must be as long as any row batch that references it.
- // 2. If the column type is not string, and the dictionary page is compressed, then a
- // temporary buffer is needed for the uncompressed values.
- // 3. If the column type is not string, and the dictionary page is not compressed,
- // then no buffer is necessary.
- ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
- uint8_t* dict_values = nullptr;
- if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) {
- int buffer_size = current_page_header_.uncompressed_page_size;
- if (slot_desc_->type().IsStringType()) {
- dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // case 1.
- } else if (uncompressed_buffer.TryAllocate(buffer_size)) {
- dict_values = uncompressed_buffer.buffer(); // case 2
- }
- if (UNLIKELY(dict_values == nullptr)) {
- string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary",
- buffer_size, "dictionary");
- return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
- parent_->state_, details, buffer_size);
- }
- } else {
- dict_values = data_; // case 3.
- }
-
- if (decompressor_.get() != nullptr) {
- int uncompressed_size = current_page_header_.uncompressed_page_size;
- const Status& status = decompressor_->ProcessBlock32(true, data_size, data_,
- &uncompressed_size, &dict_values);
- if (!status.ok()) {
- return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
- " data_page_offset $2: $3", filename(), node_.element->name,
- metadata_->data_page_offset, status.GetDetail()));
- }
- VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
- if (current_page_header_.uncompressed_page_size != uncompressed_size) {
- return Status(Substitute("Error decompressing dictionary page in file '$0'. "
- "Expected $1 uncompressed bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size, uncompressed_size));
- }
- } else {
- if (current_page_header_.uncompressed_page_size != data_size) {
- return Status(Substitute("Error reading dictionary page in file '$0'. "
- "Expected $1 bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size, data_size));
- }
- if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_, data_size);
+ // We cannot add data_size to data_ until we know it is not a nullptr.
+ if (data_ == nullptr) {
+ return Status("The dictionary values could not be read properly.");
}
+ data_end_ = data_ + data_size;
- RETURN_IF_ERROR(CreateDictionaryDecoder(
- dict_values, current_page_header_.uncompressed_page_size, &dict_decoder));
- if (dict_header != nullptr &&
- dict_header->num_values != dict_decoder->num_entries()) {
+ RETURN_IF_ERROR(CreateDictionaryDecoder(data_, data_size, &dict_decoder));
+ if (num_entries != dict_decoder->num_entries()) {
return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
- slot_desc_->type().DebugString(),
- Substitute("Expected $0 entries but data contained $1 entries",
- dict_header->num_values, dict_decoder->num_entries()));
+ slot_desc_->type().DebugString(),
+ Substitute("Expected $0 entries but data contained $1 entries",
+ num_entries, dict_decoder->num_entries()));
}
-
return Status::OK();
}
@@ -1387,176 +1119,55 @@ Status BaseScalarColumnReader::InitDictionaries(
}
Status BaseScalarColumnReader::ReadDataPage() {
- // We're about to move to the next data page. The previous data page is
+ // We're about to move to the next data page. The previous data page is
// now complete, free up any memory allocated for it. If the data page contained
// strings we need to attach it to the returned batch.
- if (PageContainsTupleData(page_encoding_)) {
- parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), false);
- } else {
- data_page_pool_->FreeAll();
- }
- // We don't hold any pointers to earlier pages in the stream - we can safely free
- // any I/O or boundary buffer.
- stream_->ReleaseCompletedResources(false);
-
- // Read the next data page, skipping page types we don't care about.
- // We break out of this loop on the non-error case (a data page was found or we read all
- // the pages).
- while (true) {
- DCHECK_EQ(num_buffered_values_, 0);
- if ((DoesPageFiltering() &&
- candidate_page_idx_ == candidate_data_pages_.size() - 1) ||
- num_values_read_ == metadata_->num_values) {
- // No more pages to read
- // TODO: should we check for stream_->eosr()?
- break;
- } else if (num_values_read_ > metadata_->num_values) {
- ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
- metadata_->num_values, num_values_read_, node_.element->name, filename());
- RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
- return Status::OK();
- }
-
- bool eos;
- uint32_t header_size;
- RETURN_IF_ERROR(ReadPageHeader(false /* peek */, ¤t_page_header_,
- &header_size, &eos));
- if (eos) return Status::OK();
-
- if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
- // Any dictionary is already initialized, as InitDictionary has already
- // been called. There are two possibilities:
- // 1. The parquet file has two dictionary pages
- // OR
- // 2. The parquet file does not have the dictionary as the first data page.
- // Both are errors in the parquet file.
- if (HasDictionaryDecoder()) {
- return Status(Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
- "for column '$1'", filename(), schema_element().name));
- } else {
- return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
- "column '$1' is not the first page", filename(), schema_element().name));
- }
- }
-
- Status status;
- int data_size = current_page_header_.compressed_page_size;
- if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
- // We can safely skip non-data pages
- if (!stream_->SkipBytes(data_size, &status)) {
- DCHECK(!status.ok());
- return status;
- }
- continue;
- }
+ col_chunk_reader_.ReleaseResourcesOfLastPage(parent_->scratch_batch_->aux_mem_pool);
+
+ DCHECK_EQ(num_buffered_values_, 0);
+ if ((DoesPageFiltering() &&
+ candidate_page_idx_ == candidate_data_pages_.size() - 1) ||
+ num_values_read_ == metadata_->num_values) {
+ // No more pages to read
+ // TODO: should we check for stream_->eosr()?
+ return Status::OK();
+ } else if (num_values_read_ > metadata_->num_values) {
+ RETURN_IF_ERROR(LogCorruptNumValuesInMetadataError());
+ return Status::OK();
+ }
- // Read Data Page
- // TODO: when we start using page statistics, we will need to ignore certain corrupt
- // statistics. See IMPALA-2208 and PARQUET-251.
- if (!stream_->ReadBytes(data_size, &data_, &status)) {
- DCHECK(!status.ok());
- return status;
- }
- data_end_ = data_ + data_size;
- int num_values = current_page_header_.data_page_header.num_values;
- if (num_values < 0) {
- return Status(Substitute("Error reading data page in Parquet file '$0'. "
+ bool eos;
+ int data_size;
+ RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size));
+ if (eos) return HandleTooEarlyEos();
+ data_end_ = data_ + data_size;
+ const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
+ int num_values = current_page_header.data_page_header.num_values;
+ if (num_values < 0) {
+ return Status(Substitute("Error reading data page in Parquet file '$0'. "
"Invalid number of values in metadata: $1", filename(), num_values));
- }
-
- num_buffered_values_ = num_values;
- num_values_read_ += num_buffered_values_;
-
- int uncompressed_size = current_page_header_.uncompressed_page_size;
- if (decompressor_.get() != nullptr) {
- SCOPED_TIMER(parent_->decompress_timer_);
- uint8_t* decompressed_buffer;
- RETURN_IF_ERROR(AllocateUncompressedDataPage(
- uncompressed_size, "decompressed data", &decompressed_buffer));
- const Status& status = decompressor_->ProcessBlock32(true,
- current_page_header_.compressed_page_size, data_, &uncompressed_size,
- &decompressed_buffer);
- if (!status.ok()) {
- return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
- " data_page_offset $2: $3", filename(), node_.element->name,
- metadata_->data_page_offset, status.GetDetail()));
- }
-
- VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
- << " to " << uncompressed_size;
- if (current_page_header_.uncompressed_page_size != uncompressed_size) {
- return Status(Substitute("Error decompressing data page in file '$0'. "
- "Expected $1 uncompressed bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size, uncompressed_size));
- }
- data_ = decompressed_buffer;
- data_size = current_page_header_.uncompressed_page_size;
- data_end_ = data_ + data_size;
- if (slot_desc() != nullptr) {
- parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size,
- current_page_header_.compressed_page_size);
- parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
- parent_->UpdateCompressedPageSizeCounter(
- current_page_header_.compressed_page_size);
- }
- } else {
- DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
- if (current_page_header_.compressed_page_size != uncompressed_size) {
- return Status(Substitute("Error reading data page in file '$0'. "
- "Expected $1 bytes but got $2", filename(),
- current_page_header_.compressed_page_size, uncompressed_size));
- }
- if (PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
- // In this case returned batches will have pointers into the data page itself.
- // We don't transfer disk I/O buffers out of the scanner so we need to copy
- // the page data so that it can be attached to output batches.
- uint8_t* copy_buffer;
- RETURN_IF_ERROR(AllocateUncompressedDataPage(
- uncompressed_size, "uncompressed variable-length data", ©_buffer));
- memcpy(copy_buffer, data_, uncompressed_size);
- data_ = copy_buffer;
- data_end_ = data_ + uncompressed_size;
- }
- if (slot_desc() != nullptr) {
- parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size, 0);
- parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
- }
- }
+ }
+ num_buffered_values_ = num_values;
+ num_values_read_ += num_buffered_values_;
- // Initialize the repetition level data
- RETURN_IF_ERROR(rep_levels_.Init(filename(),
- ¤t_page_header_.data_page_header.repetition_level_encoding,
+ /// TODO: Move the level decoder initialisation to ParquetPageReader to abstract away
+ /// the differences between Parquet header V1 and V2.
+ // Initialize the repetition level data
+ RETURN_IF_ERROR(rep_levels_.Init(filename(),
+ ¤t_page_header.data_page_header.repetition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
&data_size));
-
- // Initialize the definition level data
- RETURN_IF_ERROR(def_levels_.Init(filename(),
- ¤t_page_header_.data_page_header.definition_level_encoding,
+ // Initialize the definition level data
+ RETURN_IF_ERROR(def_levels_.Init(filename(),
+ ¤t_page_header.data_page_header.definition_level_encoding,
parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
&data_size));
+ // Data can be empty if the column contains all NULLs
+ RETURN_IF_ERROR(InitDataPage(data_, data_size));
+ // Skip rows if needed.
+ RETURN_IF_ERROR(StartPageFiltering());
- // Data can be empty if the column contains all NULLs
- RETURN_IF_ERROR(InitDataPage(data_, data_size));
-
- // Skip rows if needed.
- RETURN_IF_ERROR(StartPageFiltering());
-
- if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
- break;
- }
-
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
- const char* err_ctx, uint8_t** buffer) {
- *buffer = data_page_pool_->TryAllocate(size);
- if (*buffer == nullptr) {
- string details =
- Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
- return data_page_pool_->mem_tracker()->MemLimitExceeded(
- parent_->state_, details, size);
- }
+ if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
return Status::OK();
}
@@ -1767,6 +1378,19 @@ Status BaseScalarColumnReader::GetUnsupportedDecodingError() {
filename(), PrintThriftEnum(page_encoding_), schema_element().name));
}
+Status BaseScalarColumnReader::LogCorruptNumValuesInMetadataError() {
+ ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+ metadata_->num_values, num_values_read_, node_.element->name, filename());
+ return parent_->state_->LogOrReturnError(msg);
+}
+
+Status BaseScalarColumnReader::HandleTooEarlyEos() {
+ // The data pages contain fewer values than stated in the column metadata.
+ DCHECK(col_chunk_reader_.stream()->eosr());
+ DCHECK_LT(num_values_read_, metadata_->num_values);
+ return LogCorruptNumValuesInMetadataError();
+}
+
bool BaseScalarColumnReader::NextPage() {
parent_->assemble_rows_timer_.Stop();
parent_->parse_status_ = ReadDataPage();
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 6571379..1d035bf 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -22,9 +22,7 @@
#include "exec/parquet/hdfs-parquet-scanner.h"
#include "exec/parquet/parquet-level-decoder.h"
-#include "runtime/io/request-ranges.h"
-#include "util/bit-stream-utils.h"
-#include "util/codec.h"
+#include "exec/parquet/parquet-column-chunk-reader.h"
namespace impala {
@@ -244,7 +242,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
- data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+ col_chunk_reader_(parent, node.element->name,
+ slot_desc != nullptr ? slot_desc->id() : -1, PageReaderValueMemoryType()) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
@@ -264,7 +263,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
/// before any of the column data can be read (including dictionary and
/// data pages). Returns an error status if there was an error starting the
/// scan or allocating buffers for it.
- Status StartScan();
+ Status StartScan() { return col_chunk_reader_.StartScan(); }
/// Helper to start scans for multiple columns at once.
static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
@@ -274,14 +273,14 @@ class BaseScalarColumnReader : public ParquetColumnReader {
virtual void Close(RowBatch* row_batch);
- io::ScanRange* scan_range() const { return scan_range_; }
+ io::ScanRange* scan_range() const { return col_chunk_reader_.scan_range(); }
int64_t total_len() const { return metadata_->total_compressed_size; }
int col_idx() const { return node_.col_idx; }
THdfsCompression::type codec() const {
if (metadata_ == NULL) return THdfsCompression::NONE;
return ConvertParquetToImpalaCodec(metadata_->codec);
}
- void set_io_reservation(int bytes) { io_reservation_ = bytes; }
+ void set_io_reservation(int bytes) { col_chunk_reader_.set_io_reservation(bytes); }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
@@ -316,6 +315,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
// Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup())
friend class HdfsParquetScanner;
+ ParquetColumnChunkReader col_chunk_reader_;
+
// Class members that are accessed for every column should be included up here so they
// fit in as few cache lines as possible.
@@ -348,26 +349,6 @@ class BaseScalarColumnReader : public ParquetColumnReader {
/// Metadata for the column for the current row group.
const parquet::ColumnMetaData* metadata_ = nullptr;
- boost::scoped_ptr<Codec> decompressor_;
-
- /// The scan range for the column's data. Initialized for each row group by Reset().
- io::ScanRange* scan_range_ = nullptr;
-
- // Stream used to read data from 'scan_range_'. Initialized by StartScan().
- ScannerContext::Stream* stream_ = nullptr;
-
- /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
- /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
- /// by Reset().
- int64_t io_reservation_ = 0;
-
- /// Pool to allocate storage for data pages from - either decompression buffers for
- /// compressed data pages or copies of the data page with var-len data to attach to
- /// batches.
- boost::scoped_ptr<MemPool> data_page_pool_;
-
- /// Header for current data page.
- parquet::PageHeader current_page_header_;
/////////////////////////////////////////
/// BEGIN: Members used for page filtering
@@ -453,19 +434,19 @@ class BaseScalarColumnReader : public ParquetColumnReader {
/// 'size' bytes remaining.
virtual Status InitDataPage(uint8_t* data, int size) = 0;
- /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
- /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success,
- /// 'buffer' points to the allocated memory. Otherwise an error status is returned.
- Status AllocateUncompressedDataPage(
- int64_t size, const char* err_ctx, uint8_t** buffer);
-
- /// Returns true if a data page for this column with the specified 'encoding' may
- /// contain strings referenced by returned batches. Cases where this is not true are:
- /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'.
- /// * Fixed-length slots, where there is no string data.
- bool PageContainsTupleData(parquet::Encoding::type page_encoding) {
- return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
- && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
+ ParquetColumnChunkReader::ValueMemoryType PageReaderValueMemoryType() {
+ if (slot_desc_ == nullptr) {
+ return ParquetColumnChunkReader::ValueMemoryType::NO_SLOT_DESC;
+ }
+
+ const ColumnType& col_type = slot_desc_->type();
+ if (col_type.IsStringType()) {
+ if (col_type.IsVarLenStringType()) {
+ return ParquetColumnChunkReader::ValueMemoryType::VAR_LEN_STR;
+ }
+ return ParquetColumnChunkReader::ValueMemoryType::FIXED_LEN_STR;
+ }
+ return ParquetColumnChunkReader::ValueMemoryType::SCALAR;
}
/// Resets structures related to page filtering.
@@ -561,6 +542,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
// Returns a detailed error message about unsupported encoding.
Status GetUnsupportedDecodingError();
+
+ Status LogCorruptNumValuesInMetadataError();
+ Status HandleTooEarlyEos();
};
// Inline to allow inlining into collection and scalar column reader.
diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc
new file mode 100644
index 0000000..7765feb
--- /dev/null
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-page-reader.h"
+
+#include <sstream>
+#include <string>
+#include <gflags/gflags.h>
+#include <gutil/strings/substitute.h>
+
+#include "common/names.h"
+#include "exec/scanner-context.inline.h"
+#include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "util/pretty-printer.h"
+
+
+// Max data page header size in bytes. This is an estimate and only needs to be an upper
+// bound. It is theoretically possible to have a page header of any size due to string
+// value statistics, but in practice we'll have trouble reading string values this large.
+// Also, this limit is in place to prevent impala from reading corrupt parquet files.
+DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
+
+using namespace impala::io;
+
+using parquet::Encoding;
+
+namespace impala {
+
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
+Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc,
+ const parquet::ColumnChunk& col_chunk, int row_group_idx,
+ std::vector<io::ScanRange::SubRange>&& sub_ranges) {
+ int64_t col_start = col_chunk.meta_data.data_page_offset;
+ if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+ col_start = col_chunk.meta_data.dictionary_page_offset;
+ }
+
+ int64_t col_len = col_chunk.meta_data.total_compressed_size;
+ if (col_len <= 0) {
+ return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+ filename(), col_len));
+ }
+
+ int64_t col_end = col_start + col_len;
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_GT(col_end, 0);
+ DCHECK_LT(col_end, file_desc.file_length);
+ const ParquetFileVersion& file_version = parent_->file_version_;
+ if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
+ // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+ // dictionary page header size in total_compressed_size and total_uncompressed_size
+ // (see IMPALA-694). We pad col_len to compensate.
+ int64_t bytes_remaining = file_desc.file_length - col_end;
+ int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+ col_len += pad;
+ col_end += pad;
+ }
+
+ if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+ return Status(Substitute("Expected parquet column file path '$0' to match "
+ "filename '$1'", col_chunk.file_path, filename()));
+ }
+
+ const ScanRange* metadata_range = parent_->metadata_range_;
+ int64_t partition_id = parent_->context_->partition_descriptor()->id();
+ const ScanRange* split_range =
+ static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+ // Determine if the column is completely contained within a local split.
+ bool col_range_local = split_range->expected_local()
+ && col_start >= split_range->offset()
+ && col_end <= split_range->offset() + split_range->len();
+ scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+ filename(), col_len, col_start, move(sub_ranges),
+ partition_id, split_range->disk_id(),
+ col_range_local, split_range->is_erasure_coded(), file_desc.mtime,
+ BufferOpts(split_range->cache_options()));
+ page_headers_read_ = 0;
+ dictionary_header_encountered_ = false;
+ state_ = State::Initialized;
+ return Status::OK();
+}
+
+Status ParquetPageReader::StartScan(int io_reservation) {
+ DCHECK_EQ(state_, State::Initialized);
+ DCHECK_GT(io_reservation, 0);
+ DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+ ScannerContext* context = parent_->context_;
+ bool needs_buffers;
+ RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
+ scan_range_, &needs_buffers));
+ if (needs_buffers) {
+ RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+ context->bp_client(), scan_range_, io_reservation));
+ }
+ stream_ = parent_->context_->AddStream(scan_range_, io_reservation);
+ DCHECK(stream_ != nullptr);
+
+ state_ = State::ToReadHeader;
+ return Status::OK();
+}
+
+Status ParquetPageReader::ReadPageHeader(bool* eos) {
+ DCHECK(state_ == State::ToReadHeader || state_ == State::ToReadData);
+ DCHECK(stream_ != nullptr);
+
+ *eos = false;
+ if (state_ == State::ToReadData) return Status::OK();
+
+ uint8_t* buffer;
+ int64_t buffer_size;
+ RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+ // Check for end of stream.
+ if (buffer_size == 0) {
+ DCHECK(stream_->eosr());
+ *eos = true;
+ return Status::OK();
+ }
+ // We don't know the actual header size until the thrift object is deserialized. Loop
+ // until we successfully deserialize the header or exceed the maximum header size.
+ uint32_t header_size;
+ Status status;
+ parquet::PageHeader header;
+ while (true) {
+ header_size = buffer_size;
+ status = DeserializeThriftMsg(buffer, &header_size, true, &header);
+ if (status.ok()) break;
+
+ if (buffer_size >= FLAGS_max_page_header_size) {
+ stringstream ss;
+ ss << "ParquetScanner: could not read data page because page header exceeded "
+ << "maximum size of "
+ << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+ status.AddDetail(ss.str());
+ return status;
+ }
+ // Didn't read entire header, increase buffer size and try again
+ int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+ status = Status::OK();
+ bool success = stream_->GetBytes(
+ new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
+ if (!success) {
+ DCHECK(!status.ok());
+ return status;
+ }
+ DCHECK(status.ok());
+ // Even though we increased the allowed buffer size, the number of bytes
+ // read did not change. The header is not limited by the buffer space,
+ // so it must be incomplete in the file.
+ if (buffer_size == new_buffer_size) {
+ DCHECK_NE(new_buffer_size, 0);
+ return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+ }
+ DCHECK_GT(new_buffer_size, buffer_size);
+ buffer_size = new_buffer_size;
+ }
+ int data_size = header.compressed_page_size;
+ if (UNLIKELY(data_size < 0)) {
+ return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+ "column '$2'", filename(), data_size, schema_name_));
+ }
+ int uncompressed_size = header.uncompressed_page_size;
+ if (UNLIKELY(uncompressed_size < 0)) {
+ return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+ "size $1 for column '$2'", filename(), uncompressed_size,
+ schema_name_));
+ }
+ const bool is_dictionary = header.__isset.dictionary_page_header;
+ if (UNLIKELY(page_headers_read_ != 0 && is_dictionary)) {
+ // Any dictionary is already initialized as it has to be the first page.
+ // There are two possibilities:
+ // 1. The parquet file has two dictionary pages
+ // OR
+ // 2. The parquet file does not have the dictionary as the first data page.
+ // Both are errors in the parquet file.
+ if (dictionary_header_encountered_) {
+ return Status( Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
+ "for column '$1'", filename(), schema_name_));
+ } else {
+ return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
+ "column '$1' is not the first page", filename(), schema_name_));
+ }
+ }
+ RETURN_IF_ERROR(AdvanceStream(header_size));
+ current_page_header_ = header;
+ header_initialized_ = true;
+ page_headers_read_++;
+ dictionary_header_encountered_ = dictionary_header_encountered_ || is_dictionary;
+ state_ = State::ToReadData;
+ return Status::OK();
+}
+
+Status ParquetPageReader::ReadPageData(uint8_t** data) {
+ DCHECK_EQ(state_, State::ToReadData);
+ Status status;
+ if (!stream_->ReadBytes(current_page_header_.compressed_page_size, data, &status)) {
+ DCHECK(!status.ok());
+ return status;
+ }
+ state_ = State::ToReadHeader;
+ return Status::OK();
+}
+
+Status ParquetPageReader::SkipPageData() {
+ DCHECK_EQ(state_, State::ToReadData);
+ RETURN_IF_ERROR(AdvanceStream(current_page_header_.compressed_page_size));
+ state_ = State::ToReadHeader;
+ return Status::OK();
+}
+
+Status ParquetPageReader::AdvanceStream(int64_t bytes) {
+ Status status;
+ if (!stream_->SkipBytes(bytes, &status)) return status;
+ return Status::OK();
+}
+
+std::ostream& operator<<(std::ostream& out, const ParquetPageReader::State state) {
+ switch (state) {
+ case ParquetPageReader::State::Uninitialized: out << "Uninitialized"; break;
+ case ParquetPageReader::State::Initialized: out << "Initialized"; break;
+ case ParquetPageReader::State::ToReadHeader: out << "ToReadHeader"; break;
+ case ParquetPageReader::State::ToReadData: out << "ToReadData"; break;
+ }
+
+ return out;
+}
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-page-reader.h b/be/src/exec/parquet/parquet-page-reader.h
new file mode 100644
index 0000000..b98e94a
--- /dev/null
+++ b/be/src/exec/parquet/parquet-page-reader.h
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+
+namespace impala {
+
+/// A class used to read Parquet page headers and data pages. Memory management is not
+/// handled by this class. Before reading, InitColumnChunk() followed by StartScan() must
+/// be called.
+class ParquetPageReader {
+ public:
+ const char* filename() const { return parent_->filename(); }
+ io::ScanRange* scan_range() const { return scan_range_; }
+ ScannerContext::Stream* stream() const { return stream_; }
+ uint64_t PageHeadersRead() const { return page_headers_read_; }
+
+ ParquetPageReader(HdfsParquetScanner* parent, string schema_name)
+ : parent_(parent),
+ schema_name_(schema_name)
+ {
+ }
+
+ /// Resets the reader for each row group in the file and creates the scan range for the
+ /// column, but does not start it. To start scanning, call StartScan().
+ Status InitColumnChunk(const HdfsFileDesc& file_desc,
+ const parquet::ColumnChunk& col_chunk, int row_group_idx,
+ std::vector<io::ScanRange::SubRange>&& sub_ranges);
+
+ /// Starts the column scan range. InitColumnChunk() needs to be called before this. This
+ /// method must be called before any of the column data can be read (including
+ /// dictionary and data pages). 'io_reservation' is the amount of reservation assigned
+ /// to the column. Returns an error status if there was an error starting the scan or
+ /// allocating buffers for it.
+ Status StartScan(int io_reservation);
+
+ /// Reads the next page header if the data belonging to the current header has been read
+ /// or skipped, otherwise does nothing. If the stream reaches the end before reading a
+ /// complete page header, '*eos' is set to true.
+ Status ReadPageHeader(bool* eos);
+
+ /// Reads the next data page to 'data'. The header must already be read. It is invalid
+ /// to call this or SkipPageData() again without reading the next header.
+ Status ReadPageData(uint8_t** data);
+
+ /// Skips the next data page. The header must already be read. It is invalid to call
+ /// this or ReadPageData() again without reading the next header.
+ Status SkipPageData();
+
+ const parquet::PageHeader& CurrentPageHeader() const {
+ DCHECK(header_initialized_);
+ return current_page_header_;
+ }
+
+ private:
+ Status AdvanceStream(int64_t bytes);
+
+ HdfsParquetScanner* parent_;
+ string schema_name_;
+
+ /// Header for current data page.
+ parquet::PageHeader current_page_header_;
+ bool header_initialized_ = false;
+
+ /// The scan range for the column's data. Initialized for each row group by Reset().
+ io::ScanRange* scan_range_ = nullptr;
+
+ /// Stream used to read data from 'scan_range_'. Initialized by StartScan().
+ ScannerContext::Stream* stream_ = nullptr;
+
+ uint64_t page_headers_read_ = 0;
+ bool dictionary_header_encountered_ = false;
+
+ /// We maintain a state machine for debug purposes. The state transitions are
+ /// the following:
+ ///
+ /// Uninitialized
+ /// +
+ /// |
+ /// | (InitColumnChunk)
+ /// |
+ /// v
+ /// +--------> Initialized
+ /// | +
+ /// | |
+ /// | | (StartScan)
+ /// | |
+ /// | v
+ /// (InitColumnChunk) +--------+ ToReadHeader <--------------+
+ /// | + |
+ /// | | |
+ /// | | (ReadPageHeader) | (ReadPageData,
+ /// | | | SkipPageData)
+ /// | v |
+ /// +--------+ ToReadData +--------------+
+ ///
+ enum class State {
+ Uninitialized,
+ Initialized,
+ ToReadHeader,
+ ToReadData,
+ };
+
+ friend
+ std::ostream& operator<<(std::ostream& out, const ParquetPageReader::State state);
+
+ State state_ = State::Uninitialized;
+};
+
+} // namespace impala