You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2018/01/13 07:50:27 UTC

[2/4] impala git commit: IMPALA-6383: free memory after skipping parquet row groups

IMPALA-6383: free memory after skipping parquet row groups

Before this patch, resources were only flushed after breaking out of
NextRowGroup(). This is a problem because resources can be allocated
for skipped row groups (e.g. for reading dictionaries).

Testing:
Tested in conjunction with a prototype buffer pool patch that was
DCHECKing before the change.

Added DCHECKs to the current version to ensure the streams are cleared
up as expected.

Change-Id: Ibc2f8f27c9b238be60261539f8d4be2facb57a2b
Reviewed-on: http://gerrit.cloudera.org:8080/9002
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/10fb24af
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/10fb24af
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/10fb24af

Branch: refs/heads/master
Commit: 10fb24afb966c567adcf632a314f6af1826f19fc
Parents: df3a440
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 10 15:35:41 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Jan 13 02:48:08 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 26 +++++++++++++++++++++-----
 be/src/exec/hdfs-parquet-scanner.h  |  5 +++++
 be/src/exec/scanner-context.h       |  8 ++++----
 3 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f0f280d..6380722 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -228,6 +228,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
   context_->ReleaseCompletedResources(true);
+  context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
@@ -263,7 +264,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     }
   } else {
     template_tuple_pool_->FreeAll();
-    dictionary_pool_.get()->FreeAll();
+    dictionary_pool_->FreeAll();
     context_->ReleaseCompletedResources(true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
     // The scratch batch may still contain tuple data. We can get into this case if
@@ -478,7 +479,6 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     // Transfer resources and clear streams if there is any leftover from the previous
     // row group. We will create new streams for the next row group.
     FlushRowGroupResources(row_batch);
-    context_->ClearStreams();
     if (!advance_row_group_) {
       Status status =
           ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
@@ -619,6 +619,9 @@ Status HdfsParquetScanner::NextRowGroup() {
   while (true) {
     // Reset the parse status for the next row group.
     parse_status_ = Status::OK();
+    // Make sure that we don't have leftover resources from the file metadata scan range
+    // or previous row groups.
+    DCHECK_EQ(0, context_->NumStreams());
 
     ++row_group_idx_;
     if (row_group_idx_ >= file_metadata_.row_groups.size()) {
@@ -669,6 +672,9 @@ Status HdfsParquetScanner::NextRowGroup() {
     // of the column.
     RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
 
+    // InitColumns() may have allocated resources to scan columns. If we skip this row
+    // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
+
     // If there is a dictionary-encoded column where every value is eliminated
     // by a conjunct, the row group can be eliminated. This initializes dictionaries
     // for all columns visited.
@@ -677,10 +683,12 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
       continue;
     }
     if (skip_row_group_on_dict_filters) {
       COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+      ReleaseSkippedRowGroupResources();
       continue;
     }
 
@@ -692,6 +700,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
       continue;
     }
 
@@ -730,9 +739,16 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
   scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
   context_->ReleaseCompletedResources(true);
-  for (ParquetColumnReader* col_reader : column_readers_) {
-    col_reader->Close(row_batch);
-  }
+  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
+  context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+  dictionary_pool_->FreeAll();
+  scratch_batch_->ReleaseResources(nullptr);
+  context_->ReleaseCompletedResources(true);
+  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
+  context_->ClearStreams();
 }
 
 bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {

http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 99b5a60..cea06ed 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -642,6 +642,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Should be called after completing a row group and when returning the last batch.
   void FlushRowGroupResources(RowBatch* row_batch);
 
+  /// Releases resources associated with a row group that was skipped and closes all
+  /// column readers. Should be called after skipping a row group from which no rows
+  /// were returned.
+  void ReleaseSkippedRowGroupResources();
+
   /// Evaluates whether the column reader is eligible for dictionary predicates
   bool IsDictFilterable(ParquetColumnReader* col_reader);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/10fb24af/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index e316063..09a4bdc 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -89,7 +89,6 @@ class ScannerContext {
   ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
       io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
-
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
 
@@ -338,6 +337,8 @@ class ScannerContext {
     return streams_[idx].get();
   }
 
+  int NumStreams() const { return streams_.size(); }
+
   /// Release completed resources for all streams, e.g. the last buffer in each stream if
   /// the current read position is at the end of the buffer. If 'done' is true all
   /// resources are freed, even if the caller has not read that data yet. After calling
@@ -354,8 +355,8 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
-  /// The stream is created in the runtime state's object pool
+  /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+  /// context.
   Stream* AddStream(io::ScanRange* range);
 
   /// Returns false if scan_node_ is multi-threaded and has been cancelled.
@@ -370,7 +371,6 @@ class ScannerContext {
 
   RuntimeState* state_;
   HdfsScanNodeBase* scan_node_;
-
   HdfsPartitionDescriptor* partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.