You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2018/01/13 07:50:27 UTC
[2/4] impala git commit: IMPALA-6383: free memory after skipping
parquet row groups
IMPALA-6383: free memory after skipping parquet row groups
Before this patch, resources were only flushed after breaking out of
NextRowGroup(). This is a problem because resources can be allocated
for skipped row groups (e.g. for reading dictionaries).
Testing:
Tested in conjunction with a prototype buffer pool patch that was
DCHECKing before the change.
Added DCHECKs to the current version to ensure the streams are cleared
up as expected.
Change-Id: Ibc2f8f27c9b238be60261539f8d4be2facb57a2b
Reviewed-on: http://gerrit.cloudera.org:8080/9002
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/10fb24af
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/10fb24af
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/10fb24af
Branch: refs/heads/master
Commit: 10fb24afb966c567adcf632a314f6af1826f19fc
Parents: df3a440
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 10 15:35:41 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Jan 13 02:48:08 2018 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 26 +++++++++++++++++++++-----
be/src/exec/hdfs-parquet-scanner.h | 5 +++++
be/src/exec/scanner-context.h | 8 ++++----
3 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f0f280d..6380722 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -228,6 +228,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
// Release I/O buffers immediately to make sure they are cleaned up
// in case we return a non-OK status anywhere below.
context_->ReleaseCompletedResources(true);
+ context_->ClearStreams();
RETURN_IF_ERROR(footer_status);
// Parse the file schema into an internal representation for schema resolution.
@@ -263,7 +264,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
}
} else {
template_tuple_pool_->FreeAll();
- dictionary_pool_.get()->FreeAll();
+ dictionary_pool_->FreeAll();
context_->ReleaseCompletedResources(true);
for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
// The scratch batch may still contain tuple data. We can get into this case if
@@ -478,7 +479,6 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
// Transfer resources and clear streams if there is any leftover from the previous
// row group. We will create new streams for the next row group.
FlushRowGroupResources(row_batch);
- context_->ClearStreams();
if (!advance_row_group_) {
Status status =
ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
@@ -619,6 +619,9 @@ Status HdfsParquetScanner::NextRowGroup() {
while (true) {
// Reset the parse status for the next row group.
parse_status_ = Status::OK();
+ // Make sure that we don't have leftover resources from the file metadata scan range
+ // or previous row groups.
+ DCHECK_EQ(0, context_->NumStreams());
++row_group_idx_;
if (row_group_idx_ >= file_metadata_.row_groups.size()) {
@@ -669,6 +672,9 @@ Status HdfsParquetScanner::NextRowGroup() {
// of the column.
RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
+ // InitColumns() may have allocated resources to scan columns. If we skip this row
+ // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
+
// If there is a dictionary-encoded column where every value is eliminated
// by a conjunct, the row group can be eliminated. This initializes dictionaries
// for all columns visited.
@@ -677,10 +683,12 @@ Status HdfsParquetScanner::NextRowGroup() {
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
continue;
}
if (skip_row_group_on_dict_filters) {
COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+ ReleaseSkippedRowGroupResources();
continue;
}
@@ -692,6 +700,7 @@ Status HdfsParquetScanner::NextRowGroup() {
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+ ReleaseSkippedRowGroupResources();
continue;
}
@@ -730,9 +739,16 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
context_->ReleaseCompletedResources(true);
- for (ParquetColumnReader* col_reader : column_readers_) {
- col_reader->Close(row_batch);
- }
+ for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
+ context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+ dictionary_pool_->FreeAll();
+ scratch_batch_->ReleaseResources(nullptr);
+ context_->ReleaseCompletedResources(true);
+ for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
+ context_->ClearStreams();
}
bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 99b5a60..cea06ed 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -642,6 +642,11 @@ class HdfsParquetScanner : public HdfsScanner {
/// Should be called after completing a row group and when returning the last batch.
void FlushRowGroupResources(RowBatch* row_batch);
+ /// Releases resources associated with a row group that was skipped and closes all
+ /// column readers. Should be called after skipping a row group from which no rows
+ /// were returned.
+ void ReleaseSkippedRowGroupResources();
+
/// Evaluates whether the column reader is eligible for dictionary predicates
bool IsDictFilterable(ParquetColumnReader* col_reader);
http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index e316063..09a4bdc 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -89,7 +89,6 @@ class ScannerContext {
ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
-
/// Destructor verifies that all stream objects have been released.
~ScannerContext();
@@ -338,6 +337,8 @@ class ScannerContext {
return streams_[idx].get();
}
+ int NumStreams() const { return streams_.size(); }
+
/// Release completed resources for all streams, e.g. the last buffer in each stream if
/// the current read position is at the end of the buffer. If 'done' is true all
/// resources are freed, even if the caller has not read that data yet. After calling
@@ -354,8 +355,8 @@ class ScannerContext {
/// size to 0.
void ClearStreams();
- /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
- /// The stream is created in the runtime state's object pool
+ /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+ /// context.
Stream* AddStream(io::ScanRange* range);
/// Returns false if scan_node_ is multi-threaded and has been cancelled.
@@ -370,7 +371,6 @@ class ScannerContext {
RuntimeState* state_;
HdfsScanNodeBase* scan_node_;
-
HdfsPartitionDescriptor* partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.