You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/29 04:21:39 UTC
[4/4] impala git commit: IMPALA-6383: free memory after skipping
parquet row groups
IMPALA-6383: free memory after skipping parquet row groups
Before this patch, resources were only flushed after breaking out of
NextRowGroup(). This is a problem because resources can be allocated
for skipped row groups (e.g. for reading dictionaries).
Testing:
Tested in conjunction with a prototype buffer pool patch that was
DCHECKing before the change.
Added DCHECKs to the current version to ensure the streams are cleared
up as expected.
Ran the repro for IMPALA-6419 to confirm this iteration of the patch
fixed the original problem.
Change-Id: I95713675455f7635fa3f72616b166f35e2a46c1a
Reviewed-on: http://gerrit.cloudera.org:8080/9059
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c0c3ba7f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c0c3ba7f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c0c3ba7f
Branch: refs/heads/2.x
Commit: c0c3ba7f45d2897083c5d6d4442b95ea7da1c3e0
Parents: 42ce1f9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 10 15:35:41 2018 -0800
Committer: Philip Zeyliger <ph...@cloudera.com>
Committed: Thu Jan 25 16:30:18 2018 -0800
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 52 +++++++++++++++++++++-----------
be/src/exec/hdfs-parquet-scanner.h | 5 +++
be/src/exec/scanner-context.h | 8 ++---
3 files changed, 44 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 3a17a3b..c14edd7 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -229,6 +229,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
// Release I/O buffers immediately to make sure they are cleaned up
// in case we return a non-OK status anywhere below.
context_->ReleaseCompletedResources(true);
+ context_->ClearStreams();
RETURN_IF_ERROR(footer_status);
// Parse the file schema into an internal representation for schema resolution.
@@ -264,7 +265,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
}
} else {
template_tuple_pool_->FreeAll();
- dictionary_pool_.get()->FreeAll();
+ dictionary_pool_->FreeAll();
context_->ReleaseCompletedResources(true);
for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
// The scratch batch may still contain tuple data. We can get into this case if
@@ -479,7 +480,6 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
// Transfer resources and clear streams if there is any leftover from the previous
// row group. We will create new streams for the next row group.
FlushRowGroupResources(row_batch);
- context_->ClearStreams();
if (!advance_row_group_) {
Status status =
ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
@@ -620,6 +620,9 @@ Status HdfsParquetScanner::NextRowGroup() {
while (true) {
// Reset the parse status for the next row group.
parse_status_ = Status::OK();
+ // Make sure that we don't have leftover resources from the file metadata scan range
+ // or previous row groups.
+ DCHECK_EQ(0, context_->NumStreams());
++row_group_idx_;
if (row_group_idx_ >= file_metadata_.row_groups.size()) {
@@ -672,6 +675,9 @@ Status HdfsParquetScanner::NextRowGroup() {
// of the column.
RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
+ // InitColumns() may have allocated resources to scan columns. If we skip this row
+ // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
+
// If there is a dictionary-encoded column where every value is eliminated
// by a conjunct, the row group can be eliminated. This initializes dictionaries
// for all columns visited.
@@ -680,10 +686,12 @@ Status HdfsParquetScanner::NextRowGroup() {
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
continue;
}
if (skip_row_group_on_dict_filters) {
COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+ ReleaseSkippedRowGroupResources();
continue;
}
@@ -695,10 +703,11 @@ Status HdfsParquetScanner::NextRowGroup() {
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
continue;
}
- bool seeding_ok = true;
+ bool seeding_failed = false;
for (ParquetColumnReader* col_reader: column_readers_) {
// Seed collection and boolean column readers with NextLevel().
// The ScalarColumnReaders use an optimized ReadValueBatch() that
@@ -707,19 +716,21 @@ Status HdfsParquetScanner::NextRowGroup() {
// ScalarColumnReader::ReadValueBatch() which does not need seeding. This
// will allow better sharing of code between the row-wise and column-wise
// materialization strategies.
- if (col_reader->NeedsSeedingForBatchedReading()) {
- if (!col_reader->NextLevels()) {
- seeding_ok = false;
- break;
- }
+ if (col_reader->NeedsSeedingForBatchedReading()
+ && !col_reader->NextLevels()) {
+ seeding_failed = true;
+ break;
}
- DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
}
-
- if (!parse_status_.ok()) {
- RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
- } else if (seeding_ok) {
- // Found a non-empty row group and successfully initialized the column readers.
+ if (seeding_failed) {
+ if (!parse_status_.ok()) {
+ RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+ }
+ ReleaseSkippedRowGroupResources();
+ continue;
+ } else {
+ // Seeding succeeded - we're ready to read the row group.
+ DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
break;
}
}
@@ -733,9 +744,16 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
context_->ReleaseCompletedResources(true);
- for (ParquetColumnReader* col_reader : column_readers_) {
- col_reader->Close(row_batch);
- }
+ for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
+ context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+ dictionary_pool_->FreeAll();
+ scratch_batch_->ReleaseResources(nullptr);
+ context_->ReleaseCompletedResources(true);
+ for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
+ context_->ClearStreams();
}
bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 2ddf0fc..b1409d7 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -654,6 +654,11 @@ class HdfsParquetScanner : public HdfsScanner {
/// Should be called after completing a row group and when returning the last batch.
void FlushRowGroupResources(RowBatch* row_batch);
+ /// Releases resources associated with a row group that was skipped and closes all
+ /// column readers. Should be called after skipping a row group from which no rows
+ /// were returned.
+ void ReleaseSkippedRowGroupResources();
+
/// Evaluates whether the column reader is eligible for dictionary predicates
bool IsDictFilterable(ParquetColumnReader* col_reader);
http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index e316063..09a4bdc 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -89,7 +89,6 @@ class ScannerContext {
ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
-
/// Destructor verifies that all stream objects have been released.
~ScannerContext();
@@ -338,6 +337,8 @@ class ScannerContext {
return streams_[idx].get();
}
+ int NumStreams() const { return streams_.size(); }
+
/// Release completed resources for all streams, e.g. the last buffer in each stream if
/// the current read position is at the end of the buffer. If 'done' is true all
/// resources are freed, even if the caller has not read that data yet. After calling
@@ -354,8 +355,8 @@ class ScannerContext {
/// size to 0.
void ClearStreams();
- /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
- /// The stream is created in the runtime state's object pool
+ /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+ /// context.
Stream* AddStream(io::ScanRange* range);
/// Returns false if scan_node_ is multi-threaded and has been cancelled.
@@ -370,7 +371,6 @@ class ScannerContext {
RuntimeState* state_;
HdfsScanNodeBase* scan_node_;
-
HdfsPartitionDescriptor* partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.