You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/27 02:02:03 UTC
[13/14] impala git commit: IMPALA-7869: break up
parquet-column-readers.cc
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
deleted file mode 100644
index 8a8b700..0000000
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ /dev/null
@@ -1,1681 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-parquet-scanner.h"
-
-#include <algorithm>
-#include <queue>
-
-#include <gflags/gflags.h>
-#include <gutil/strings/substitute.h>
-
-#include "codegen/codegen-anyval.h"
-#include "exec/hdfs-scan-node.h"
-#include "exec/parquet-column-readers.h"
-#include "exec/parquet-column-stats.h"
-#include "exec/scanner-context.inline.h"
-#include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
-#include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
-#include "runtime/runtime-state.h"
-#include "runtime/runtime-filter.inline.h"
-#include "rpc/thrift-util.h"
-
-#include "common/names.h"
-
-DECLARE_bool(convert_legacy_hive_parquet_utc_timestamps);
-
-using std::move;
-using std::sort;
-using namespace impala;
-using namespace impala::io;
-
-// Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
-// has fewer entries, then the entire column is dictionary encoded. This threshold
-// is guaranteed to be true for Impala versions 2.9 or below.
-// THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
-const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
-
-const int16_t HdfsParquetScanner::ROW_GROUP_END;
-const int16_t HdfsParquetScanner::INVALID_LEVEL;
-const int16_t HdfsParquetScanner::INVALID_POS;
-
-const char* HdfsParquetScanner::LLVM_CLASS_NAME = "class.impala::HdfsParquetScanner";
-
-static const string PARQUET_MEM_LIMIT_EXCEEDED =
- "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
-
-namespace impala {
-
-static const string IDEAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupIdealReservation";
-static const string ACTUAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupActualReservation";
-
-Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
- const vector<HdfsFileDesc*>& files) {
- DCHECK(!files.empty());
- // Add Parquet-specific counters.
- ADD_SUMMARY_STATS_COUNTER(
- scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
- ADD_SUMMARY_STATS_COUNTER(
- scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
-
- for (HdfsFileDesc* file : files) {
- // If the file size is less than 12 bytes, it is an invalid Parquet file.
- if (file->file_length < 12) {
- return Status(Substitute("Parquet file $0 has an invalid file length: $1",
- file->filename, file->file_length));
- }
- }
- return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
-}
-
-HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
- : HdfsScanner(scan_node, state),
- row_group_idx_(-1),
- row_group_rows_read_(0),
- advance_row_group_(true),
- min_max_tuple_(nullptr),
- row_batches_produced_(0),
- scratch_batch_(new ScratchTupleBatch(
- *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
- metadata_range_(nullptr),
- dictionary_pool_(new MemPool(scan_node->mem_tracker())),
- assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
- process_footer_timer_stats_(nullptr),
- num_cols_counter_(nullptr),
- num_stats_filtered_row_groups_counter_(nullptr),
- num_row_groups_counter_(nullptr),
- num_scanners_with_no_reads_counter_(nullptr),
- num_dict_filtered_row_groups_counter_(nullptr),
- coll_items_read_counter_(0),
- codegend_process_scratch_batch_fn_(nullptr) {
- assemble_rows_timer_.Stop();
-}
-
-Status HdfsParquetScanner::Open(ScannerContext* context) {
- RETURN_IF_ERROR(HdfsScanner::Open(context));
- metadata_range_ = stream_->scan_range();
- num_cols_counter_ =
- ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
- num_stats_filtered_row_groups_counter_ =
- ADD_COUNTER(scan_node_->runtime_profile(), "NumStatsFilteredRowGroups",
- TUnit::UNIT);
- num_row_groups_counter_ =
- ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
- num_scanners_with_no_reads_counter_ =
- ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
- num_dict_filtered_row_groups_counter_ =
- ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
- process_footer_timer_stats_ =
- ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime");
-
- codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
- scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
- if (codegend_process_scratch_batch_fn_ == nullptr) {
- scan_node_->IncNumScannersCodegenDisabled();
- } else {
- scan_node_->IncNumScannersCodegenEnabled();
- }
-
- perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
-
- // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
- if (min_max_tuple_desc != nullptr) {
- int64_t tuple_size = min_max_tuple_desc->byte_size();
- uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
- if (buffer == nullptr) {
- string details = Substitute("Could not allocate buffer of $0 bytes for Parquet "
- "statistics tuple for file '$1'.", tuple_size, filename());
- return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
- }
- min_max_tuple_ = reinterpret_cast<Tuple*>(buffer);
- }
-
- // Clone the min/max statistics conjuncts.
- RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
- expr_perm_pool_.get(), context_->expr_results_pool(),
- scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
-
- for (int i = 0; i < context->filter_ctxs().size(); ++i) {
- const FilterContext* ctx = &context->filter_ctxs()[i];
- DCHECK(ctx->filter != nullptr);
- filter_ctxs_.push_back(ctx);
- }
- filter_stats_.resize(filter_ctxs_.size());
-
- DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
-
- // Each scan node can process multiple splits. Each split processes the footer once.
- // We use a timer to measure the time taken to ProcessFooter() per split and add
- // this time to the averaged timer.
- MonotonicStopWatch single_footer_process_timer;
- single_footer_process_timer.Start();
- // First process the file metadata in the footer.
- Status footer_status = ProcessFooter();
- single_footer_process_timer.Stop();
-
- process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
-
- // Release I/O buffers immediately to make sure they are cleaned up
- // in case we return a non-OK status anywhere below.
- stream_ = nullptr;
- context_->ReleaseCompletedResources(true);
- context_->ClearStreams();
- RETURN_IF_ERROR(footer_status);
-
- // Parse the file schema into an internal representation for schema resolution.
- schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
- state_->query_options().parquet_fallback_schema_resolution,
- state_->query_options().parquet_array_resolution));
- RETURN_IF_ERROR(schema_resolver_->Init(&file_metadata_, filename()));
-
- // We've processed the metadata and there are columns that need to be materialized.
- RETURN_IF_ERROR(CreateColumnReaders(
- *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_));
- COUNTER_SET(num_cols_counter_,
- static_cast<int64_t>(CountScalarColumns(column_readers_)));
- // Set top-level template tuple.
- template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
-
- RETURN_IF_ERROR(InitDictFilterStructures());
- return Status::OK();
-}
-
-void HdfsParquetScanner::Close(RowBatch* row_batch) {
- DCHECK(!is_closed_);
- if (row_batch != nullptr) {
- FlushRowGroupResources(row_batch);
- row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
- if (scan_node_->HasRowBatchQueue()) {
- static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
- unique_ptr<RowBatch>(row_batch));
- }
- } else {
- template_tuple_pool_->FreeAll();
- dictionary_pool_->FreeAll();
- context_->ReleaseCompletedResources(true);
- for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
- // The scratch batch may still contain tuple data. We can get into this case if
- // Open() fails or if the query is cancelled.
- scratch_batch_->ReleaseResources(nullptr);
- }
- if (perm_pool_ != nullptr) {
- perm_pool_->FreeAll();
- perm_pool_.reset();
- }
-
- // Verify all resources (if any) have been transferred.
- DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
- DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
- DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
-
- // Collect compression types for reporting completed ranges.
- vector<THdfsCompression::type> compression_types;
- stack<ParquetColumnReader*> readers;
- for (ParquetColumnReader* r: column_readers_) readers.push(r);
- while (!readers.empty()) {
- ParquetColumnReader* reader = readers.top();
- readers.pop();
- if (reader->IsCollectionReader()) {
- CollectionColumnReader* coll_reader = static_cast<CollectionColumnReader*>(reader);
- for (ParquetColumnReader* r: *coll_reader->children()) readers.push(r);
- continue;
- }
- BaseScalarColumnReader* scalar_reader = static_cast<BaseScalarColumnReader*>(reader);
- compression_types.push_back(scalar_reader->codec());
- }
- assemble_rows_timer_.Stop();
- assemble_rows_timer_.ReleaseCounter();
-
- // If this was a metadata only read (i.e. count(*)), there are no columns.
- if (compression_types.empty()) {
- compression_types.push_back(THdfsCompression::NONE);
- scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types, true);
- } else {
- scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types);
- }
- if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
-
- ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
-
- for (int i = 0; i < filter_ctxs_.size(); ++i) {
- const FilterStats* stats = filter_ctxs_[i]->stats;
- const LocalFilterStats& local = filter_stats_[i];
- stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
- local.considered, local.rejected);
- }
-
- CloseInternal();
-}
-
-// Get the start of the column.
-static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) {
- if (column.__isset.dictionary_page_offset) {
- DCHECK_LT(column.dictionary_page_offset, column.data_page_offset);
- return column.dictionary_page_offset;
- }
- return column.data_page_offset;
-}
-
-// Get the file offset of the middle of the row group.
-static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
- int64_t start_offset = GetColumnStartOffset(row_group.columns[0].meta_data);
-
- const parquet::ColumnMetaData& last_column =
- row_group.columns[row_group.columns.size() - 1].meta_data;
- int64_t end_offset =
- GetColumnStartOffset(last_column) + last_column.total_compressed_size;
-
- return start_offset + (end_offset - start_offset) / 2;
-}
-
-// Returns true if 'row_group' overlaps with 'split_range'.
-static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
- const ScanRange* split_range) {
- int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data);
-
- const parquet::ColumnMetaData& last_column =
- row_group.columns[row_group.columns.size() - 1].meta_data;
- int64_t row_group_end =
- GetColumnStartOffset(last_column) + last_column.total_compressed_size;
-
- int64_t split_start = split_range->offset();
- int64_t split_end = split_start + split_range->len();
-
- return (split_start >= row_group_start && split_start < row_group_end) ||
- (split_end > row_group_start && split_end <= row_group_end) ||
- (split_start <= row_group_start && split_end >= row_group_end);
-}
-
-int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
- DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
- int num_columns = 0;
- stack<ParquetColumnReader*> readers;
- for (ParquetColumnReader* r: column_readers_) readers.push(r);
- while (!readers.empty()) {
- ParquetColumnReader* col_reader = readers.top();
- readers.pop();
- if (col_reader->IsCollectionReader()) {
- CollectionColumnReader* collection_reader =
- static_cast<CollectionColumnReader*>(col_reader);
- for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r);
- continue;
- }
- ++num_columns;
- }
- return num_columns;
-}
-
-Status HdfsParquetScanner::ProcessSplit() {
- DCHECK(scan_node_->HasRowBatchQueue());
- HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
- do {
- if (FilterContext::CheckForAlwaysFalse(FilterStats::SPLITS_KEY,
- context_->filter_ctxs())) {
- eos_ = true;
- break;
- }
- unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
- state_->batch_size(), scan_node_->mem_tracker());
- Status status = GetNextInternal(batch.get());
- // Always add batch to the queue because it may contain data referenced by previously
- // appended batches.
- scan_node->AddMaterializedRowBatch(move(batch));
- RETURN_IF_ERROR(status);
- ++row_batches_produced_;
- if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
- CheckFiltersEffectiveness();
- }
- } while (!eos_ && !scan_node_->ReachedLimit());
- return Status::OK();
-}
-
-Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
- DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
- if (scan_node_->optimize_parquet_count_star()) {
- // Populate the single slot with the Parquet num rows statistic.
- int64_t tuple_buf_size;
- uint8_t* tuple_buf;
- // We try to allocate a smaller row batch here because in most cases the number row
- // groups in a file is much lower than the default row batch capacity.
- int capacity = min(
- static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity());
- RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
- row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
- &capacity, &tuple_buf_size, &tuple_buf));
- while (!row_batch->AtCapacity()) {
- RETURN_IF_ERROR(NextRowGroup());
- DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
- DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
- if (row_group_idx_ == file_metadata_.row_groups.size()) break;
- Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
- TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
- InitTuple(template_tuple_, dst_tuple);
- int64_t* dst_slot =
- dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
- *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
- row_group_rows_read_ += *dst_slot;
- dst_row->SetTuple(0, dst_tuple);
- row_batch->CommitLastRow();
- tuple_buf += scan_node_->tuple_desc()->byte_size();
- }
- eos_ = row_group_idx_ == file_metadata_.row_groups.size();
- return Status::OK();
- } else if (scan_node_->IsZeroSlotTableScan()) {
- // There are no materialized slots and we are not optimizing count(*), e.g.
- // "select 1 from alltypes". We can serve this query from just the file metadata.
- // We don't need to read the column data.
- if (row_group_rows_read_ == file_metadata_.num_rows) {
- eos_ = true;
- return Status::OK();
- }
- assemble_rows_timer_.Start();
- DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
- int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
- int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
- TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
- int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
- Status status = CommitRows(row_batch, num_to_commit);
- assemble_rows_timer_.Stop();
- RETURN_IF_ERROR(status);
- row_group_rows_read_ += max_tuples;
- COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
- return Status::OK();
- }
-
- // Transfer remaining tuples from the scratch batch.
- if (!scratch_batch_->AtEnd()) {
- assemble_rows_timer_.Start();
- int num_row_to_commit = TransferScratchTuples(row_batch);
- assemble_rows_timer_.Stop();
- RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
- if (row_batch->AtCapacity()) return Status::OK();
- }
-
- while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
- // Transfer resources and clear streams if there is any leftover from the previous
- // row group. We will create new streams for the next row group.
- FlushRowGroupResources(row_batch);
- if (!advance_row_group_) {
- Status status =
- ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
- if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
- }
- RETURN_IF_ERROR(NextRowGroup());
- DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
- if (row_group_idx_ == file_metadata_.row_groups.size()) {
- eos_ = true;
- DCHECK(parse_status_.ok());
- return Status::OK();
- }
- }
-
- // Apply any runtime filters to static tuples containing the partition keys for this
- // partition. If any filter fails, we return immediately and stop processing this
- // scan range.
- if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
- FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
- eos_ = true;
- DCHECK(parse_status_.ok());
- return Status::OK();
- }
- assemble_rows_timer_.Start();
- Status status = AssembleRows(column_readers_, row_batch, &advance_row_group_);
- assemble_rows_timer_.Stop();
- RETURN_IF_ERROR(status);
- if (!parse_status_.ok()) {
- RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
- parse_status_ = Status::OK();
- }
-
- return Status::OK();
-}
-
-Status HdfsParquetScanner::EvaluateStatsConjuncts(
- const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
- bool* skip_row_group) {
- *skip_row_group = false;
-
- if (!state_->query_options().parquet_read_statistics) return Status::OK();
-
- const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
- if (!min_max_tuple_desc) return Status::OK();
-
- int64_t tuple_size = min_max_tuple_desc->byte_size();
-
- DCHECK(min_max_tuple_ != nullptr);
- min_max_tuple_->Init(tuple_size);
-
- DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
- for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
- SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
- ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
-
- // Resolve column path to determine col idx.
- SchemaNode* node = nullptr;
- bool pos_field;
- bool missing_field;
- RETURN_IF_ERROR(schema_resolver_->ResolvePath(slot_desc->col_path(),
- &node, &pos_field, &missing_field));
-
- if (missing_field) {
- // We are selecting a column that is not in the file. We would set its slot to NULL
- // during the scan, so any predicate would evaluate to false. Return early. NULL
- // comparisons cannot happen here, since predicates with NULL literals are filtered
- // in the frontend.
- *skip_row_group = true;
- break;
- }
-
- if (pos_field) {
- // The planner should not send predicates with 'pos' for stats filtering to the BE.
- // In case there is a bug, we return an error, which will abort the query.
- stringstream err;
- err << "Statistics not supported for pos fields: " << slot_desc->DebugString();
- DCHECK(false) << err.str();
- return Status(err.str());
- }
-
- int col_idx = node->col_idx;
- DCHECK_LT(col_idx, row_group.columns.size());
-
- const vector<parquet::ColumnOrder>& col_orders = file_metadata.column_orders;
- const parquet::ColumnOrder* col_order = nullptr;
- if (col_idx < col_orders.size()) col_order = &col_orders[col_idx];
-
- const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
- const ColumnType& col_type = slot_desc->type();
-
- DCHECK(node->element != nullptr);
-
- ColumnStatsReader stat_reader(col_chunk, col_type, col_order, *node->element);
- if (col_type.IsTimestampType()) {
- stat_reader.SetTimestampDecoder(CreateTimestampDecoder(*node->element));
- }
-
- int64_t null_count = 0;
- bool null_count_result = stat_reader.ReadNullCountStat(&null_count);
- if (null_count_result && null_count == col_chunk.meta_data.num_values) {
- *skip_row_group = true;
- break;
- }
-
- const string& fn_name = eval->root().function_name();
- ColumnStatsReader::StatsField stats_field;
- if (fn_name == "lt" || fn_name == "le") {
- // We need to get min stats.
- stats_field = ColumnStatsReader::StatsField::MIN;
- } else if (fn_name == "gt" || fn_name == "ge") {
- // We need to get max stats.
- stats_field = ColumnStatsReader::StatsField::MAX;
- } else {
- DCHECK(false) << "Unsupported function name for statistics evaluation: " << fn_name;
- continue;
- }
-
- void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
- bool stats_read = stat_reader.ReadFromThrift(stats_field, slot);
-
- if (stats_read) {
- TupleRow row;
- row.SetTuple(0, min_max_tuple_);
- if (!ExecNode::EvalPredicate(eval, &row)) {
- *skip_row_group = true;
- break;
- }
- }
- }
-
- // Free any expr result allocations accumulated during conjunct evaluation.
- context_->expr_results_pool()->Clear();
- return Status::OK();
-}
-
-Status HdfsParquetScanner::NextRowGroup() {
- const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
- metadata_range_->meta_data())->original_split;
- int64_t split_offset = split_range->offset();
- int64_t split_length = split_range->len();
-
- HdfsFileDesc* file_desc = scan_node_->GetFileDesc(
- context_->partition_descriptor()->id(), filename());
-
- bool start_with_first_row_group = row_group_idx_ == -1;
- bool misaligned_row_group_skipped = false;
-
- advance_row_group_ = false;
- row_group_rows_read_ = 0;
-
- // Loop until we have found a non-empty row group, and successfully initialized and
- // seeded the column readers. Return a non-OK status from within loop only if the error
- // is non-recoverable, otherwise log the error and continue with the next row group.
- while (true) {
- // Reset the parse status for the next row group.
- parse_status_ = Status::OK();
- // Make sure that we don't have leftover resources from the file metadata scan range
- // or previous row groups.
- DCHECK_EQ(0, context_->NumStreams());
-
- ++row_group_idx_;
- if (row_group_idx_ >= file_metadata_.row_groups.size()) {
- if (start_with_first_row_group && misaligned_row_group_skipped) {
- // We started with the first row group and skipped all the row groups because
- // they were misaligned. The execution flow won't reach this point if there is at
- // least one non-empty row group which this scanner can process.
- COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
- }
- break;
- }
- const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
- // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
- // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
- // but some data in row groups.
- if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
-
- RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
- file_desc->filename, file_desc->file_length, row_group));
-
- // A row group is processed by the scanner whose split overlaps with the row
- // group's mid point.
- int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
- if (!(row_group_mid_pos >= split_offset &&
- row_group_mid_pos < split_offset + split_length)) {
- // The mid-point does not fall within the split, this row group will be handled by a
- // different scanner.
- // If the row group overlaps with the split, we found a misaligned row group.
- misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range);
- continue;
- }
-
- COUNTER_ADD(num_row_groups_counter_, 1);
-
- // Evaluate row group statistics.
- bool skip_row_group_on_stats;
- RETURN_IF_ERROR(
- EvaluateStatsConjuncts(file_metadata_, row_group, &skip_row_group_on_stats));
- if (skip_row_group_on_stats) {
- COUNTER_ADD(num_stats_filtered_row_groups_counter_, 1);
- continue;
- }
-
- InitCollectionColumns();
- RETURN_IF_ERROR(InitScalarColumns());
-
- // Start scanning dictionary filtering column readers, so we can read the dictionary
- // pages in EvalDictionaryFilters().
- RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
-
- // StartScans() may have allocated resources to scan columns. If we skip this row
- // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
-
- // If there is a dictionary-encoded column where every value is eliminated
- // by a conjunct, the row group can be eliminated. This initializes dictionaries
- // for all columns visited.
- bool skip_row_group_on_dict_filters;
- Status status = EvalDictionaryFilters(row_group, &skip_row_group_on_dict_filters);
- if (!status.ok()) {
- // Either return an error or skip this row group if it is ok to ignore errors
- RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
- ReleaseSkippedRowGroupResources();
- continue;
- }
- if (skip_row_group_on_dict_filters) {
- COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
- ReleaseSkippedRowGroupResources();
- continue;
- }
-
- // At this point, the row group has passed any filtering criteria
- // Start scanning non-dictionary filtering column readers and initialize their
- // dictionaries.
- RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
- status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
- if (!status.ok()) {
- // Either return an error or skip this row group if it is ok to ignore errors
- RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
- ReleaseSkippedRowGroupResources();
- continue;
- }
- DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
- break;
- }
- DCHECK(parse_status_.ok());
- return Status::OK();
-}
-
-void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
- DCHECK(row_batch != nullptr);
- row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
- scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
- context_->ReleaseCompletedResources(true);
- for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
- context_->ClearStreams();
-}
-
-void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
- dictionary_pool_->FreeAll();
- scratch_batch_->ReleaseResources(nullptr);
- context_->ReleaseCompletedResources(true);
- for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
- context_->ClearStreams();
-}
-
-bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
- const SlotDescriptor* slot_desc = col_reader->slot_desc();
- // Some queries do not need the column to be materialized, so slot_desc is NULL.
- // For example, a count(*) with no predicates only needs to count records
- // rather than materializing the values.
- if (!slot_desc) return false;
- // Does this column reader have any dictionary filter conjuncts?
- auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
- if (dict_filter_it == dict_filter_map_.end()) return false;
-
- // Certain datatypes (chars, timestamps) do not have the appropriate value in the
- // file format and must be converted before return. This is true for the
- // dictionary values, so skip these datatypes for now.
- // TODO: The values should be converted during dictionary construction and stored
- // in converted form in the dictionary.
- if (col_reader->NeedsConversion()) return false;
-
- // Certain datatypes (timestamps) need to validate the value, as certain bit
- // combinations are not valid. The dictionary values are not validated, so
- // skip these datatypes for now.
- // TODO: This should be pushed into dictionary construction.
- if (col_reader->NeedsValidation()) return false;
-
- return true;
-}
-
-void HdfsParquetScanner::PartitionReaders(
- const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) {
- for (auto* reader : readers) {
- if (reader->IsCollectionReader()) {
- CollectionColumnReader* col_reader = static_cast<CollectionColumnReader*>(reader);
- collection_readers_.push_back(col_reader);
- PartitionReaders(*col_reader->children(), can_eval_dict_filters);
- } else {
- BaseScalarColumnReader* scalar_reader =
- static_cast<BaseScalarColumnReader*>(reader);
- scalar_readers_.push_back(scalar_reader);
- if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
- dict_filterable_readers_.push_back(scalar_reader);
- } else {
- non_dict_filterable_readers_.push_back(scalar_reader);
- }
- }
- }
-}
-
-Status HdfsParquetScanner::InitDictFilterStructures() {
- bool can_eval_dict_filters =
- state_->query_options().parquet_dictionary_filtering && !dict_filter_map_.empty();
-
- // Separate column readers into scalar and collection readers.
- PartitionReaders(column_readers_, can_eval_dict_filters);
-
- // Allocate tuple buffers for all tuple descriptors that are associated with conjuncts
- // that can be dictionary filtered.
- for (auto* col_reader : dict_filterable_readers_) {
- const SlotDescriptor* slot_desc = col_reader->slot_desc();
- const TupleDescriptor* tuple_desc = slot_desc->parent();
- auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
- if (tuple_it != dict_filter_tuple_map_.end()) continue;
- int tuple_size = tuple_desc->byte_size();
- if (tuple_size > 0) {
- uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
- if (buffer == nullptr) {
- string details = Substitute(
- PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size,
- "Dictionary Filtering Tuple");
- return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
- }
- dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer);
- }
- }
- return Status::OK();
-}
-
-bool HdfsParquetScanner::IsDictionaryEncoded(
- const parquet::ColumnMetaData& col_metadata) {
- // The Parquet spec allows for column chunks to have mixed encodings
- // where some data pages are dictionary-encoded and others are plain
- // encoded. For example, a Parquet file writer might start writing
- // a column chunk as dictionary encoded, but it will switch to plain
- // encoding if the dictionary grows too large.
- //
- // In order for dictionary filters to skip the entire row group,
- // the conjuncts must be evaluated on column chunks that are entirely
- // encoded with the dictionary encoding. There are two checks
- // available to verify this:
- // 1. The encoding_stats field on the column chunk metadata provides
- // information about the number of data pages written in each
- // format. This allows for a specific check of whether all the
- // data pages are dictionary encoded.
- // 2. The encodings field on the column chunk metadata lists the
- // encodings used. If this list contains the dictionary encoding
- // and does not include unexpected encodings (i.e. encodings not
- // associated with definition/repetition levels), then it is entirely
- // dictionary encoded.
-
- if (col_metadata.__isset.encoding_stats) {
- // Condition #1 above
- for (const parquet::PageEncodingStats& enc_stat : col_metadata.encoding_stats) {
- if (enc_stat.page_type == parquet::PageType::DATA_PAGE &&
- enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY &&
- enc_stat.count > 0) {
- return false;
- }
- }
- } else {
- // Condition #2 above
- bool has_dict_encoding = false;
- bool has_nondict_encoding = false;
- for (const parquet::Encoding::type& encoding : col_metadata.encodings) {
- if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = true;
-
- // RLE and BIT_PACKED are used for repetition/definition levels
- if (encoding != parquet::Encoding::PLAIN_DICTIONARY &&
- encoding != parquet::Encoding::RLE &&
- encoding != parquet::Encoding::BIT_PACKED) {
- has_nondict_encoding = true;
- break;
- }
- }
- // Not entirely dictionary encoded if:
- // 1. No dictionary encoding listed
- // OR
- // 2. Some non-dictionary encoding is listed
- if (!has_dict_encoding || has_nondict_encoding) return false;
- }
-
- return true;
-}
-
-Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
- bool* row_group_eliminated) {
- *row_group_eliminated = false;
- // Check if there's anything to do here.
- if (dict_filterable_readers_.empty()) return Status::OK();
-
- // Legacy impala files (< 2.9) require special handling, because they do not encode
- // information about whether the column is 100% dictionary encoded.
- bool is_legacy_impala = false;
- if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) {
- is_legacy_impala = true;
- }
-
- // Keeps track of column readers that need to be initialized. For example, if a
- // column cannot be filtered, then defer its dictionary initialization once we know
- // the row group cannot be filtered.
- vector<BaseScalarColumnReader*> deferred_dict_init_list;
- // Keeps track of the initialized tuple associated with a TupleDescriptor.
- unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
- for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
- const parquet::ColumnMetaData& col_metadata =
- row_group.columns[scalar_reader->col_idx()].meta_data;
-
- // Legacy impala files cannot be eliminated here, because the only way to
- // determine whether the column is 100% dictionary encoded requires reading
- // the dictionary.
- if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
- // We cannot guarantee that this reader is 100% dictionary encoded,
- // so dictionary filters cannot be used. Defer initializing its dictionary
- // until after the other filters have been evaluated.
- deferred_dict_init_list.push_back(scalar_reader);
- continue;
- }
-
- RETURN_IF_ERROR(scalar_reader->InitDictionary());
- DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
- if (!dictionary) continue;
-
- // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
- // it reaches the maximum number of dictionary entries. If the dictionary
- // has fewer entries, then it is 100% dictionary encoded.
- if (is_legacy_impala &&
- dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
-
- const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
- DCHECK(slot_desc != nullptr);
- const TupleDescriptor* tuple_desc = slot_desc->parent();
- auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
- DCHECK(dict_filter_it != dict_filter_map_.end());
- const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
- dict_filter_it->second;
- Tuple* dict_filter_tuple = nullptr;
- auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
- if (dict_filter_tuple_it == tuple_map.end()) {
- auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
- DCHECK(tuple_it != dict_filter_tuple_map_.end());
- dict_filter_tuple = tuple_it->second;
- dict_filter_tuple->Init(tuple_desc->byte_size());
- tuple_map[tuple_desc] = dict_filter_tuple;
- } else {
- dict_filter_tuple = dict_filter_tuple_it->second;
- }
-
- DCHECK(dict_filter_tuple != nullptr);
- void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
- bool column_has_match = false;
- for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
- if (dict_idx % 1024 == 0) {
- // Don't let expr result allocations accumulate too much for large dictionaries or
- // many row groups.
- context_->expr_results_pool()->Clear();
- }
- dictionary->GetValue(dict_idx, slot);
-
- // We can only eliminate this row group if no value from the dictionary matches.
- // If any dictionary value passes the conjuncts, then move on to the next column.
- TupleRow row;
- row.SetTuple(0, dict_filter_tuple);
- if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(),
- dict_filter_conjunct_evals.size(), &row)) {
- column_has_match = true;
- break;
- }
- }
- // Free all expr result allocations now that we're done with the filter.
- context_->expr_results_pool()->Clear();
-
- if (!column_has_match) {
- // The column contains no value that matches the conjunct. The row group
- // can be eliminated.
- *row_group_eliminated = true;
- return Status::OK();
- }
- }
-
- // Any columns that were not 100% dictionary encoded need to initialize
- // their dictionaries here.
- RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
-
- return Status::OK();
-}
-
-/// High-level steps of this function:
-/// 1. Allocate 'scratch' memory for tuples able to hold a full batch
-/// 2. Populate the slots of all scratch tuples one column reader at a time,
-/// using the ColumnReader::Read*ValueBatch() functions.
-/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and
-/// set the surviving tuples in the output batch. Transfer the ownership of
-/// scratch memory to the output batch once the scratch memory is exhausted.
-/// 4. Repeat steps above until we are done with the row group or an error
-/// occurred.
-/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
-/// difficult to maintain a maximum memory footprint without throwing away at least
-/// some work. This point needs further experimentation and thought.
-Status HdfsParquetScanner::AssembleRows(
- const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
- bool* skip_row_group) {
- DCHECK(!column_readers.empty());
- DCHECK(row_batch != nullptr);
- DCHECK_EQ(*skip_row_group, false);
- DCHECK(scratch_batch_ != nullptr);
-
- int64_t num_rows_read = 0;
- while (!column_readers[0]->RowGroupAtEnd()) {
- // Start a new scratch batch.
- RETURN_IF_ERROR(scratch_batch_->Reset(state_));
- InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
-
- // Materialize the top-level slots into the scratch batch column-by-column.
- int last_num_tuples = -1;
- for (int c = 0; c < column_readers.size(); ++c) {
- ParquetColumnReader* col_reader = column_readers[c];
- bool continue_execution;
- if (col_reader->max_rep_level() > 0) {
- continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
- scratch_batch_->capacity, tuple_byte_size_, scratch_batch_->tuple_mem,
- &scratch_batch_->num_tuples);
- } else {
- continue_execution = col_reader->ReadNonRepeatedValueBatch(
- &scratch_batch_->aux_mem_pool, scratch_batch_->capacity, tuple_byte_size_,
- scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
- }
- // Check that all column readers populated the same number of values.
- bool num_tuples_mismatch = c != 0 && last_num_tuples != scratch_batch_->num_tuples;
- if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
- // Skipping this row group. Free up all the resources with this row group.
- FlushRowGroupResources(row_batch);
- scratch_batch_->num_tuples = 0;
- DCHECK(scratch_batch_->AtEnd());
- *skip_row_group = true;
- if (num_tuples_mismatch && continue_execution) {
- Status err(Substitute("Corrupt Parquet file '$0': column '$1' "
- "had $2 remaining values but expected $3", filename(),
- col_reader->schema_element().name, last_num_tuples,
- scratch_batch_->num_tuples));
- parse_status_.MergeStatus(err);
- }
- return Status::OK();
- }
- last_num_tuples = scratch_batch_->num_tuples;
- }
- num_rows_read += scratch_batch_->num_tuples;
- int num_row_to_commit = TransferScratchTuples(row_batch);
- RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
- if (row_batch->AtCapacity()) break;
- }
- row_group_rows_read_ += num_rows_read;
- COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
- // Merge Scanner-local counter into HdfsScanNode counter and reset.
- COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
- coll_items_read_counter_ = 0;
- return Status::OK();
-}
-
-Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
- DCHECK(dst_batch != nullptr);
- dst_batch->CommitRows(num_rows);
-
- if (context_->cancelled()) return Status::CancelledInternal("Parquet scanner");
- // TODO: It's a really bad idea to propagate UDF error via the global RuntimeState.
- // Store UDF error in thread local storage or make UDF return status so it can merge
- // with parse_status_.
- RETURN_IF_ERROR(state_->GetQueryStatus());
- // Clear expr result allocations for this thread to avoid accumulating too much
- // memory from evaluating the scanner conjuncts.
- context_->expr_results_pool()->Clear();
- return Status::OK();
-}
-
-int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
- // This function must not be called when the output batch is already full. As long as
- // we always call CommitRows() after TransferScratchTuples(), the output batch can
- // never be empty.
- DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
- DCHECK_EQ(scan_node_->tuple_idx(), 0);
- DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
- if (scratch_batch_->tuple_byte_size == 0) {
- Tuple** output_row =
- reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
- // We are materializing a collection with empty tuples. Add a NULL tuple to the
- // output batch per remaining scratch tuple and return. No need to evaluate
- // filters/conjuncts.
- DCHECK(filter_ctxs_.empty());
- DCHECK(conjunct_evals_->empty());
- int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
- scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
- memset(output_row, 0, num_tuples * sizeof(Tuple*));
- scratch_batch_->tuple_idx += num_tuples;
- // No data is required to back the empty tuples, so we should not attach any data to
- // these batches.
- DCHECK_EQ(0, scratch_batch_->total_allocated_bytes());
- return num_tuples;
- }
-
- int num_rows_to_commit;
- if (codegend_process_scratch_batch_fn_ != nullptr) {
- num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
- } else {
- num_rows_to_commit = ProcessScratchBatch(dst_batch);
- }
- scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
- return num_rows_to_commit;
-}
-
-Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
- const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) {
- DCHECK(node->runtime_state()->ShouldCodegen());
- *process_scratch_batch_fn = nullptr;
- LlvmCodeGen* codegen = node->runtime_state()->codegen();
- DCHECK(codegen != nullptr);
-
- llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
- DCHECK(fn != nullptr);
-
- llvm::Function* eval_conjuncts_fn;
- RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
- DCHECK(eval_conjuncts_fn != nullptr);
-
- int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
- DCHECK_REPLACE_COUNT(replaced, 1);
-
- llvm::Function* eval_runtime_filters_fn;
- RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
- codegen, node->filter_exprs(), &eval_runtime_filters_fn));
- DCHECK(eval_runtime_filters_fn != nullptr);
-
- replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters");
- DCHECK_REPLACE_COUNT(replaced, 1);
-
- fn->setName("ProcessScratchBatch");
- *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
- if (*process_scratch_batch_fn == nullptr) {
- return Status("Failed to finalize process_scratch_batch_fn.");
- }
- return Status::OK();
-}
-
-bool HdfsParquetScanner::AssembleCollection(
- const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level,
- CollectionValueBuilder* coll_value_builder) {
- DCHECK(!column_readers.empty());
- DCHECK_GE(new_collection_rep_level, 0);
- DCHECK(coll_value_builder != nullptr);
-
- const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
- Tuple* template_tuple = template_tuple_map_[tuple_desc];
- const vector<ScalarExprEvaluator*> evals =
- conjunct_evals_map_[tuple_desc->id()];
-
- int64_t rows_read = 0;
- bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled();
- // Note that this will be set to true at the end of the row group or the end of the
- // current collection (if applicable).
- bool end_of_collection = column_readers[0]->rep_level() == -1;
- // We only initialize end_of_collection to true here if we're at the end of the row
- // group (otherwise it would always be true because we're on the "edge" of two
- // collections), and only ProcessSplit() should call AssembleRows() at the end of the
- // row group.
- if (coll_value_builder != nullptr) DCHECK(!end_of_collection);
-
- while (!end_of_collection && continue_execution) {
- MemPool* pool;
- Tuple* tuple;
- TupleRow* row = nullptr;
-
- int64_t num_rows;
- // We're assembling item tuples into an CollectionValue
- parse_status_ =
- GetCollectionMemory(coll_value_builder, &pool, &tuple, &row, &num_rows);
- if (UNLIKELY(!parse_status_.ok())) {
- continue_execution = false;
- break;
- }
- // 'num_rows' can be very high if we're writing to a large CollectionValue. Limit
- // the number of rows we read at one time so we don't spend too long in the
- // 'num_rows' loop below before checking for cancellation or limit reached.
- num_rows = min(
- num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
-
- int num_to_commit = 0;
- int row_idx = 0;
- for (row_idx = 0; row_idx < num_rows && !end_of_collection; ++row_idx) {
- DCHECK(continue_execution);
- // A tuple is produced iff the collection that contains its values is not empty and
- // non-NULL. (Empty or NULL collections produce no output values, whereas NULL is
- // output for the fields of NULL structs.)
- bool materialize_tuple = column_readers[0]->def_level() >=
- column_readers[0]->def_level_of_immediate_repeated_ancestor();
- InitTuple(tuple_desc, template_tuple, tuple);
- continue_execution =
- ReadCollectionItem(column_readers, materialize_tuple, pool, tuple);
- if (UNLIKELY(!continue_execution)) break;
- end_of_collection = column_readers[0]->rep_level() <= new_collection_rep_level;
-
- if (materialize_tuple) {
- if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) {
- tuple = next_tuple(tuple_desc->byte_size(), tuple);
- ++num_to_commit;
- }
- }
- }
-
- rows_read += row_idx;
- coll_value_builder->CommitTuples(num_to_commit);
- continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
- }
- coll_items_read_counter_ += rows_read;
- if (end_of_collection) {
- // All column readers should report the start of the same collection.
- for (int c = 1; c < column_readers.size(); ++c) {
- FILE_CHECK_EQ(column_readers[c]->rep_level(), column_readers[0]->rep_level());
- }
- }
- return continue_execution;
-}
-
-inline bool HdfsParquetScanner::ReadCollectionItem(
- const vector<ParquetColumnReader*>& column_readers,
- bool materialize_tuple, MemPool* pool, Tuple* tuple) const {
- DCHECK(!column_readers.empty());
- bool continue_execution = true;
- int size = column_readers.size();
- for (int c = 0; c < size; ++c) {
- ParquetColumnReader* col_reader = column_readers[c];
- if (materialize_tuple) {
- // All column readers for this tuple should a value to materialize.
- FILE_CHECK_GE(col_reader->def_level(),
- col_reader->def_level_of_immediate_repeated_ancestor());
- // Fill in position slot if applicable
- const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
- if (pos_slot_desc != nullptr) {
- col_reader->ReadPositionNonBatched(
- tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
- }
- continue_execution = col_reader->ReadValue(pool, tuple);
- } else {
- // A containing repeated field is empty or NULL
- FILE_CHECK_LT(col_reader->def_level(),
- col_reader->def_level_of_immediate_repeated_ancestor());
- continue_execution = col_reader->NextLevels();
- }
- if (UNLIKELY(!continue_execution)) break;
- }
- return continue_execution;
-}
-
-Status HdfsParquetScanner::ProcessFooter() {
- const int64_t file_len = stream_->file_desc()->file_length;
- const int64_t scan_range_len = stream_->scan_range()->len();
-
- // We're processing the scan range issued in IssueInitialRanges(). The scan range should
- // be the last FOOTER_BYTES of the file. !success means the file is shorter than we
- // expect. Note we can't detect if the file is larger than we expect without attempting
- // to read past the end of the scan range, but in this case we'll fail below trying to
- // parse the footer.
- DCHECK_LE(scan_range_len, FOOTER_SIZE);
- uint8_t* buffer;
- bool success = stream_->ReadBytes(scan_range_len, &buffer, &parse_status_);
- if (!success) {
- DCHECK(!parse_status_.ok());
- if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) {
- VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: "
- << "metadata states file size to be "
- << PrettyPrinter::Print(file_len, TUnit::BYTES)
- << ", but could only read "
- << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES);
- return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(),
- scan_node_->hdfs_table()->fully_qualified_name());
- }
- return parse_status_;
- }
- DCHECK(stream_->eosr());
-
- // Number of bytes in buffer after the fixed size footer is accounted for.
- int remaining_bytes_buffered = scan_range_len - sizeof(int32_t) -
- sizeof(PARQUET_VERSION_NUMBER);
-
- // Make sure footer has enough bytes to contain the required information.
- if (remaining_bytes_buffered < 0) {
- return Status(Substitute("File '$0' is invalid. Missing metadata.", filename()));
- }
-
- // Validate magic file bytes are correct.
- uint8_t* magic_number_ptr = buffer + scan_range_len - sizeof(PARQUET_VERSION_NUMBER);
- if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER,
- sizeof(PARQUET_VERSION_NUMBER)) != 0) {
- return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, filename(),
- string(reinterpret_cast<char*>(magic_number_ptr), sizeof(PARQUET_VERSION_NUMBER)),
- scan_node_->hdfs_table()->fully_qualified_name());
- }
-
- // The size of the metadata is encoded as a 4 byte little endian value before
- // the magic number
- uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t);
- uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr);
- uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
- // The start of the metadata is:
- // file_len - 4-byte footer length field - 4-byte version number field - metadata size
- int64_t metadata_start = file_len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) -
- metadata_size;
-
- // If the metadata was too big, we need to read it into a contiguous buffer before
- // deserializing it.
- ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
-
- DCHECK(metadata_range_ != nullptr);
- if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
- // In this case, the metadata is bigger than our guess meaning there are
- // not enough bytes in the footer range from IssueInitialRanges().
- // We'll just issue more ranges to the IoMgr that is the actual footer.
- int64_t partition_id = context_->partition_descriptor()->id();
- const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
- DCHECK_EQ(file_desc, stream_->file_desc());
- if (metadata_start < 0) {
- return Status(Substitute("File '$0' is invalid. Invalid metadata size in file "
- "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size, file_len));
- }
-
- if (!metadata_buffer.TryAllocate(metadata_size)) {
- string details = Substitute("Could not allocate buffer of $0 bytes for Parquet "
- "metadata for file '$1'.", metadata_size, filename());
- return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, metadata_size);
- }
- metadata_ptr = metadata_buffer.buffer();
-
- // Read the footer into the metadata buffer.
- ScanRange* metadata_range = scan_node_->AllocateScanRange(
- metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
- metadata_range_->disk_id(), metadata_range_->expected_local(),
- BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
-
- unique_ptr<BufferDescriptor> io_buffer;
- bool needs_buffers;
- RETURN_IF_ERROR(
- scan_node_->reader_context()->StartScanRange(metadata_range, &needs_buffers));
- DCHECK(!needs_buffers) << "Already provided a buffer";
- RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
- DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
- DCHECK_EQ(io_buffer->len(), metadata_size);
- DCHECK(io_buffer->eosr());
- metadata_range->ReturnBuffer(move(io_buffer));
- }
-
- // Deserialize file footer
- // TODO: this takes ~7ms for a 1000-column table, figure out how to reduce this.
- Status status =
- DeserializeThriftMsg(metadata_ptr, &metadata_size, true, &file_metadata_);
- if (!status.ok()) {
- return Status(Substitute("File '$0' of length $1 bytes has invalid file metadata "
- "at file offset $2, Error = $3.", filename(), file_len, metadata_start,
- status.GetDetail()));
- }
-
- RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename()));
-
- // IMPALA-3943: Do not throw an error for empty files for backwards compatibility.
- if (file_metadata_.num_rows == 0) {
- // Warn if the num_rows is inconsistent with the row group metadata.
- if (!file_metadata_.row_groups.empty()) {
- bool has_non_empty_row_group = false;
- for (const parquet::RowGroup& row_group : file_metadata_.row_groups) {
- if (row_group.num_rows > 0) {
- has_non_empty_row_group = true;
- break;
- }
- }
- // Warn if there is at least one non-empty row group.
- if (has_non_empty_row_group) {
- ErrorMsg msg(TErrorCode::PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE, filename());
- state_->LogError(msg);
- }
- }
- return Status::OK();
- }
-
- // Parse out the created by application version string
- if (file_metadata_.__isset.created_by) {
- file_version_ = ParquetFileVersion(file_metadata_.created_by);
- }
- if (file_metadata_.row_groups.empty()) {
- return Status(
- Substitute("Invalid file. This file: $0 has no row groups", filename()));
- }
- if (file_metadata_.num_rows < 0) {
- return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in "
- "file metadata", filename(), file_metadata_.num_rows));
- }
- return Status::OK();
-}
-
-Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc,
- const ParquetSchemaResolver& schema_resolver,
- vector<ParquetColumnReader*>* column_readers) {
- DCHECK(column_readers != nullptr);
- DCHECK(column_readers->empty());
-
- if (scan_node_->optimize_parquet_count_star()) {
- // Column readers are not needed because we are not reading from any columns if this
- // optimization is enabled.
- return Status::OK();
- }
-
- // Each tuple can have at most one position slot. We'll process this slot desc last.
- SlotDescriptor* pos_slot_desc = nullptr;
-
- for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
- // Skip partition columns
- if (&tuple_desc == scan_node_->tuple_desc() &&
- slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
-
- SchemaNode* node = nullptr;
- bool pos_field;
- bool missing_field;
- RETURN_IF_ERROR(schema_resolver.ResolvePath(
- slot_desc->col_path(), &node, &pos_field, &missing_field));
-
- if (missing_field) {
- // In this case, we are selecting a column that is not in the file.
- // Update the template tuple to put a NULL in this slot.
- Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
- if (*template_tuple == nullptr) {
- *template_tuple =
- Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
- }
- (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
- continue;
- }
-
- if (pos_field) {
- DCHECK(pos_slot_desc == nullptr)
- << "There should only be one position slot per tuple";
- pos_slot_desc = slot_desc;
- continue;
- }
-
- RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(), *node->element,
- slot_desc, state_));
-
- ParquetColumnReader* col_reader = ParquetColumnReader::Create(
- *node, slot_desc->type().IsCollectionType(), slot_desc, this);
- column_readers->push_back(col_reader);
-
- if (col_reader->IsCollectionReader()) {
- // Recursively populate col_reader's children
- DCHECK(slot_desc->collection_item_descriptor() != nullptr);
- const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
- CollectionColumnReader* collection_reader =
- static_cast<CollectionColumnReader*>(col_reader);
- RETURN_IF_ERROR(CreateColumnReaders(
- *item_tuple_desc, schema_resolver, collection_reader->children()));
- }
- }
-
- if (column_readers->empty()) {
- // This is either a count(*) over a collection type (count(*) over the table is
- // handled in ProcessFooter()), or no materialized columns appear in this file
- // (e.g. due to schema evolution, or if there's only a position slot). Create a single
- // column reader that we will use to count the number of tuples we should output. We
- // will not read any values from this reader.
- ParquetColumnReader* reader;
- RETURN_IF_ERROR(CreateCountingReader(tuple_desc.tuple_path(), schema_resolver, &reader));
- column_readers->push_back(reader);
- }
-
- if (pos_slot_desc != nullptr) {
- // 'tuple_desc' has a position slot. Use an existing column reader to populate it.
- DCHECK(!column_readers->empty());
- (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
- }
-
- return Status::OK();
-}
-
-Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
- const ParquetSchemaResolver& schema_resolver, ParquetColumnReader** reader) {
- SchemaNode* parent_node;
- bool pos_field;
- bool missing_field;
- RETURN_IF_ERROR(schema_resolver.ResolvePath(
- parent_path, &parent_node, &pos_field, &missing_field));
-
- if (missing_field) {
- // TODO: can we do anything else here?
- return Status(Substitute("Could not find '$0' in file '$1'.",
- PrintPath(*scan_node_->hdfs_table(), parent_path), filename()));
- }
- DCHECK(!pos_field);
- DCHECK(parent_path.empty() || parent_node->is_repeated());
-
- if (!parent_node->children.empty()) {
- // Find a non-struct (i.e. collection or scalar) child of 'parent_node', which we will
- // use to create the item reader
- const SchemaNode* target_node = &parent_node->children[0];
- while (!target_node->children.empty() && !target_node->is_repeated()) {
- target_node = &target_node->children[0];
- }
-
- *reader = ParquetColumnReader::Create(
- *target_node, target_node->is_repeated(), nullptr, this);
- if (target_node->is_repeated()) {
- // Find the closest scalar descendant of 'target_node' via breadth-first search, and
- // create scalar reader to drive 'reader'. We find the closest (i.e. least-nested)
- // descendant as a heuristic for picking a descendant with fewer values, so it's
- // faster to scan.
- // TODO: use different heuristic than least-nested? Fewest values?
- const SchemaNode* node = nullptr;
- queue<const SchemaNode*> nodes;
- nodes.push(target_node);
- while (!nodes.empty()) {
- node = nodes.front();
- nodes.pop();
- if (node->children.size() > 0) {
- for (const SchemaNode& child: node->children) nodes.push(&child);
- } else {
- // node is the least-nested scalar descendant of 'target_node'
- break;
- }
- }
- DCHECK(node->children.empty()) << node->DebugString();
- CollectionColumnReader* parent_reader =
- static_cast<CollectionColumnReader*>(*reader);
- parent_reader->children()->push_back(
- ParquetColumnReader::Create(*node, false, nullptr, this));
- }
- } else {
- // Special case for a repeated scalar node. The repeated node represents both the
- // parent collection and the child item.
- *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this);
- }
-
- return Status::OK();
-}
-
-void HdfsParquetScanner::InitCollectionColumns() {
- for (CollectionColumnReader* col_reader: collection_readers_) {
- col_reader->Reset();
- }
-}
-
-Status HdfsParquetScanner::InitScalarColumns() {
- int64_t partition_id = context_->partition_descriptor()->id();
- const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
- DCHECK(file_desc != nullptr);
- parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
-
- // Used to validate that the number of values in each reader in column_readers_ at the
- // same SchemaElement is the same.
- unordered_map<const parquet::SchemaElement*, int> num_values_map;
- for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
- const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
- auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
- int num_values = -1;
- if (num_values_it != num_values_map.end()) {
- num_values = num_values_it->second;
- } else {
- num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
- }
- if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
- // TODO: improve this error message by saying which columns are different,
- // and also specify column in other error messages as appropriate
- return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
- col_chunk.meta_data.num_values, num_values, filename());
- }
- RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
- }
- RETURN_IF_ERROR(DivideReservationBetweenColumns(scalar_readers_));
- return Status::OK();
-}
-
-Status HdfsParquetScanner::DivideReservationBetweenColumns(
- const vector<BaseScalarColumnReader*>& column_readers) {
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- const int64_t min_buffer_size = io_mgr->min_buffer_size();
- const int64_t max_buffer_size = io_mgr->max_buffer_size();
- // The HdfsScanNode reservation calculation in the planner ensures that we have
- // reservation for at least one buffer per column.
- if (context_->total_reservation() < min_buffer_size * column_readers.size()) {
- return Status(TErrorCode::INTERNAL_ERROR,
- Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
- "least $1 bytes per column for $2 columns but had $3 bytes",
- filename(), min_buffer_size, column_readers.size(),
- context_->total_reservation()));
- }
-
- vector<int64_t> col_range_lengths(column_readers.size());
- for (int i = 0; i < column_readers.size(); ++i) {
- col_range_lengths[i] = column_readers[i]->scan_range()->len();
- }
-
- // The scanner-wide stream was used only to read the file footer. Each column has added
- // its own stream. We can use the total reservation now that 'stream_''s resources have
- // been released. We may benefit from increasing reservation further, so let's compute
- // the ideal reservation to scan all the columns.
- int64_t ideal_reservation = ComputeIdealReservation(col_range_lengths);
- if (ideal_reservation > context_->total_reservation()) {
- context_->TryIncreaseReservation(ideal_reservation);
- }
- scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
- UpdateCounter(context_->total_reservation());
- scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
- UpdateCounter(ideal_reservation);
-
- vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
- min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation());
- for (auto& tmp_reservation : tmp_reservations) {
- column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
- }
- return Status::OK();
-}
-
-int64_t HdfsParquetScanner::ComputeIdealReservation(
- const vector<int64_t>& col_range_lengths) {
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- int64_t ideal_reservation = 0;
- for (int64_t len : col_range_lengths) {
- ideal_reservation += io_mgr->ComputeIdealBufferReservation(len);
- }
- return ideal_reservation;
-}
-
-vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
- int64_t min_buffer_size, int64_t max_buffer_size,
- const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
- // Pair of (column index, reservation allocated).
- vector<pair<int, int64_t>> tmp_reservations;
- for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
-
- // Sort in descending order of length, breaking ties by index so that larger columns
- // get allocated reservation first. It is common to have dramatically different column
- // sizes in a single file because of different value sizes and compressibility. E.g.
- // consider a large STRING "comment" field versus a highly compressible
- // dictionary-encoded column with only a few distinct values. We want to give max-sized
- // buffers to large columns first to maximize the size of I/Os that we do while reading
- // this row group.
- sort(tmp_reservations.begin(), tmp_reservations.end(),
- [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
- int64_t left_len = col_range_lengths[left.first];
- int64_t right_len = col_range_lengths[right.first];
- return left_len != right_len ? left_len > right_len : left.first < right.first;
- });
-
- // Set aside the minimum reservation per column.
- reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
-
- // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
- // or a large enough buffer to fit the remaining data for each column. Do this
- // round-robin up to the ideal number of I/O buffers.
- for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
- for (auto& tmp_reservation : tmp_reservations) {
- // Add back the reservation we set aside above.
- if (i == 0) reservation_to_distribute += min_buffer_size;
-
- int64_t bytes_left_in_range =
- col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
- int64_t bytes_to_add;
- if (bytes_left_in_range >= max_buffer_size) {
- if (reservation_to_distribute >= max_buffer_size) {
- bytes_to_add = max_buffer_size;
- } else if (i == 0) {
- DCHECK_EQ(0, tmp_reservation.second);
- // Ensure this range gets at least one buffer on the first iteration.
- bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
- } else {
- DCHECK_GT(tmp_reservation.second, 0);
- // We need to read more than the max buffer size, but can't allocate a
- // max-sized buffer. Stop adding buffers to this column: we prefer to use
- // the existing max-sized buffers without small buffers mixed in so that
- // we will alway do max-sized I/Os, which make efficient use of I/O devices.
- bytes_to_add = 0;
- }
- } else if (bytes_left_in_range > 0 &&
- reservation_to_distribute >= min_buffer_size) {
- // Choose a buffer size that will fit the rest of the bytes left in the range.
- bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
- // But don't add more reservation than is available.
- bytes_to_add =
- min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
- } else {
- bytes_to_add = 0;
- }
- DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
- reservation_to_distribute -= bytes_to_add;
- tmp_reservation.second += bytes_to_add;
- DCHECK_GE(reservation_to_distribute, 0);
- DCHECK_GT(tmp_reservation.second, 0);
- }
- }
- return tmp_reservations;
-}
-
-Status HdfsParquetScanner::InitDictionaries(
- const vector<BaseScalarColumnReader*>& column_readers) {
- for (BaseScalarColumnReader* scalar_reader : column_readers) {
- RETURN_IF_ERROR(scalar_reader->InitDictionary());
- }
- return Status::OK();
-}
-
-Status HdfsParquetScanner::ValidateEndOfRowGroup(
- const vector<ParquetColumnReader*>& column_readers, int row_group_idx, int64_t rows_read) {
- DCHECK(!column_readers.empty());
- DCHECK(parse_status_.ok()) << "Don't overwrite parse_status_"
- << parse_status_.GetDetail();
-
- if (column_readers[0]->max_rep_level() == 0) {
- // These column readers materialize table-level values (vs. collection values). Test
- // if the expected number of rows from the file metadata matches the actual number of
- // rows read from the file.
- int64_t expected_rows_in_group = file_metadata_.row_groups[row_group_idx].num_rows;
- if (rows_read != expected_rows_in_group) {
- return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(), row_group_idx,
- expected_rows_in_group, rows_read);
- }
- }
-
- // Validate scalar column readers' state
- int num_values_read = -1;
- for (int c = 0; c < column_readers.size(); ++c) {
- if (column_readers[c]->IsCollectionReader()) continue;
- BaseScalarColumnReader* reader =
- static_cast<BaseScalarColumnReader*>(column_readers[c]);
- // All readers should have exhausted the final data page. This could fail if one
- // column has more values than stated in the metadata, meaning the final data page
- // will still have unread values.
- if (reader->num_buffered_values_ != 0) {
- return Status(Substitute("Corrupt Parquet metadata in file '$0': metadata reports "
- "'$1' more values in data page than actually present", filename(),
- reader->num_buffered_values_));
- }
- // Sanity check that the num_values_read_ value is the same for all readers. All
- // readers should have been advanced in lockstep (the above check is more likely to
- // fail if this not the case though, since num_values_read_ is only updated at the end
- // of a data page).
- if (num_values_read == -1) num_values_read = reader->num_values_read_;
- DCHECK_EQ(reader->num_values_read_, num_values_read);
- // ReadDataPage() uses metadata_->num_values to determine when the column's done
- DCHECK(reader->num_values_read_ == reader->metadata_->num_values ||
- !state_->abort_on_error());
- }
- return Status::OK();
-}
-
-ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
- const parquet::SchemaElement& element) {
- bool timestamp_conversion_needed_for_int96_timestamps =
- FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
- file_version_.application == "parquet-mr";
-
- return ParquetTimestampDecoder(element, &state_->local_time_zone(),
- timestamp_conversion_needed_for_int96_timestamps);
-}
-}