You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/29 04:21:39 UTC

[4/4] impala git commit: IMPALA-6383: free memory after skipping parquet row groups

IMPALA-6383: free memory after skipping parquet row groups

Before this patch, resources were only flushed after breaking out of
NextRowGroup(). This is a problem because resources can be allocated
for skipped row groups (e.g. for reading dictionaries).

Testing:
Tested in conjunction with a prototype buffer pool patch that was
DCHECKing before the change.

Added DCHECKs to the current version to ensure the streams are cleared
up as expected.

Ran the repro for IMPALA-6419 to confirm this iteration of the patch
fixed the original problem.

Change-Id: I95713675455f7635fa3f72616b166f35e2a46c1a
Reviewed-on: http://gerrit.cloudera.org:8080/9059
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c0c3ba7f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c0c3ba7f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c0c3ba7f

Branch: refs/heads/2.x
Commit: c0c3ba7f45d2897083c5d6d4442b95ea7da1c3e0
Parents: 42ce1f9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 10 15:35:41 2018 -0800
Committer: Philip Zeyliger <ph...@cloudera.com>
Committed: Thu Jan 25 16:30:18 2018 -0800

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 52 +++++++++++++++++++++-----------
 be/src/exec/hdfs-parquet-scanner.h  |  5 +++
 be/src/exec/scanner-context.h       |  8 ++---
 3 files changed, 44 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 3a17a3b..c14edd7 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -229,6 +229,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
   context_->ReleaseCompletedResources(true);
+  context_->ClearStreams();
   RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
@@ -264,7 +265,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     }
   } else {
     template_tuple_pool_->FreeAll();
-    dictionary_pool_.get()->FreeAll();
+    dictionary_pool_->FreeAll();
     context_->ReleaseCompletedResources(true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
     // The scratch batch may still contain tuple data. We can get into this case if
@@ -479,7 +480,6 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     // Transfer resources and clear streams if there is any leftover from the previous
     // row group. We will create new streams for the next row group.
     FlushRowGroupResources(row_batch);
-    context_->ClearStreams();
     if (!advance_row_group_) {
       Status status =
           ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
@@ -620,6 +620,9 @@ Status HdfsParquetScanner::NextRowGroup() {
   while (true) {
     // Reset the parse status for the next row group.
     parse_status_ = Status::OK();
+    // Make sure that we don't have leftover resources from the file metadata scan range
+    // or previous row groups.
+    DCHECK_EQ(0, context_->NumStreams());
 
     ++row_group_idx_;
     if (row_group_idx_ >= file_metadata_.row_groups.size()) {
@@ -672,6 +675,9 @@ Status HdfsParquetScanner::NextRowGroup() {
     // of the column.
     RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
 
+    // InitColumns() may have allocated resources to scan columns. If we skip this row
+    // group below, we must call ReleaseSkippedRowGroupResources() before continuing.
+
     // If there is a dictionary-encoded column where every value is eliminated
     // by a conjunct, the row group can be eliminated. This initializes dictionaries
     // for all columns visited.
@@ -680,10 +686,12 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
       continue;
     }
     if (skip_row_group_on_dict_filters) {
       COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+      ReleaseSkippedRowGroupResources();
       continue;
     }
 
@@ -695,10 +703,11 @@ Status HdfsParquetScanner::NextRowGroup() {
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      ReleaseSkippedRowGroupResources();
       continue;
     }
 
-    bool seeding_ok = true;
+    bool seeding_failed = false;
     for (ParquetColumnReader* col_reader: column_readers_) {
       // Seed collection and boolean column readers with NextLevel().
       // The ScalarColumnReaders use an optimized ReadValueBatch() that
@@ -707,19 +716,21 @@ Status HdfsParquetScanner::NextRowGroup() {
       // ScalarColumnReader::ReadValueBatch() which does not need seeding. This
       // will allow better sharing of code between the row-wise and column-wise
       // materialization strategies.
-      if (col_reader->NeedsSeedingForBatchedReading()) {
-        if (!col_reader->NextLevels()) {
-          seeding_ok = false;
-          break;
-        }
+      if (col_reader->NeedsSeedingForBatchedReading()
+          && !col_reader->NextLevels()) {
+        seeding_failed = true;
+        break;
       }
-      DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
     }
-
-    if (!parse_status_.ok()) {
-      RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
-    } else if (seeding_ok) {
-      // Found a non-empty row group and successfully initialized the column readers.
+    if (seeding_failed) {
+      if (!parse_status_.ok()) {
+        RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+      }
+      ReleaseSkippedRowGroupResources();
+      continue;
+    } else {
+      // Seeding succeeded - we're ready to read the row group.
+      DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
       break;
     }
   }
@@ -733,9 +744,16 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
   scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
   context_->ReleaseCompletedResources(true);
-  for (ParquetColumnReader* col_reader : column_readers_) {
-    col_reader->Close(row_batch);
-  }
+  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(row_batch);
+  context_->ClearStreams();
+}
+
+void HdfsParquetScanner::ReleaseSkippedRowGroupResources() {
+  dictionary_pool_->FreeAll();
+  scratch_batch_->ReleaseResources(nullptr);
+  context_->ReleaseCompletedResources(true);
+  for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
+  context_->ClearStreams();
 }
 
 bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {

http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 2ddf0fc..b1409d7 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -654,6 +654,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Should be called after completing a row group and when returning the last batch.
   void FlushRowGroupResources(RowBatch* row_batch);
 
+  /// Releases resources associated with a row group that was skipped and closes all
+  /// column readers. Should be called after skipping a row group from which no rows
+  /// were returned.
+  void ReleaseSkippedRowGroupResources();
+
   /// Evaluates whether the column reader is eligible for dictionary predicates
   bool IsDictFilterable(ParquetColumnReader* col_reader);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3ba7f/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index e316063..09a4bdc 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -89,7 +89,6 @@ class ScannerContext {
   ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
       io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
-
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
 
@@ -338,6 +337,8 @@ class ScannerContext {
     return streams_[idx].get();
   }
 
+  int NumStreams() const { return streams_.size(); }
+
   /// Release completed resources for all streams, e.g. the last buffer in each stream if
   /// the current read position is at the end of the buffer. If 'done' is true all
   /// resources are freed, even if the caller has not read that data yet. After calling
@@ -354,8 +355,8 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
-  /// The stream is created in the runtime state's object pool
+  /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+  /// context.
   Stream* AddStream(io::ScanRange* range);
 
   /// Returns false if scan_node_ is multi-threaded and has been cancelled.
@@ -370,7 +371,6 @@ class ScannerContext {
 
   RuntimeState* state_;
   HdfsScanNodeBase* scan_node_;
-
   HdfsPartitionDescriptor* partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.