You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/27 22:40:29 UTC

[2/2] impala git commit: IMPALA-7324: remove MarkNeedsDeepCopy() from sorter

IMPALA-7324: remove MarkNeedsDeepCopy() from sorter

Testing:
Ran exhaustive tests.

Change-Id: If86f038f1f6ca81ad5c9d40af1b7ea6115144ffc
Reviewed-on: http://gerrit.cloudera.org:8080/11048
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b5608264
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b5608264
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b5608264

Branch: refs/heads/master
Commit: b5608264b4552e44eb73ded1e232a8775c3dba6b
Parents: 154c530
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 23 23:29:04 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 27 22:34:47 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/row-batch.cc         | 10 ++--
 be/src/runtime/row-batch.h          | 10 ++--
 be/src/runtime/sorted-run-merger.cc |  6 +--
 be/src/runtime/sorted-run-merger.h  |  9 ++--
 be/src/runtime/sorter.cc            | 83 +++++++++++++-------------------
 5 files changed, 53 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index ca2a9f3..b5c51bb 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -44,7 +44,7 @@ const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
 RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_tracker)
   : num_rows_(0),
     capacity_(capacity),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(row_desc->tuple_descriptors().size()),
     tuple_ptrs_size_(capacity * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -71,7 +71,7 @@ RowBatch::RowBatch(
     const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
   : num_rows_(input_batch.num_rows),
     capacity_(input_batch.num_rows),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(input_batch.row_tuples.size()),
     tuple_ptrs_size_(capacity_ * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -108,7 +108,7 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header
     MemTracker* mem_tracker)
   : num_rows_(0),
     capacity_(0),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(header.num_tuples_per_row()),
     tuple_ptrs_size_(header.num_rows() * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -436,7 +436,7 @@ void RowBatch::Reset() {
   tuple_data_pool_.FreeAll();
   FreeBuffers();
   attached_buffer_bytes_ = 0;
-  flush_ = FlushMode::NO_FLUSH_RESOURCES;
+  flush_mode_ = FlushMode::NO_FLUSH_RESOURCES;
   needs_deep_copy_ = false;
 }
 
@@ -449,7 +449,7 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   buffers_.clear();
   if (needs_deep_copy_) {
     dest->MarkNeedsDeepCopy();
-  } else if (flush_ == FlushMode::FLUSH_RESOURCES) {
+  } else if (flush_mode_ == FlushMode::FLUSH_RESOURCES) {
     dest->MarkFlushResources();
   }
   Reset();

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 08df8a8..67adb9b 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -198,7 +198,7 @@ class RowBatch {
     DCHECK_LE(num_rows_, capacity_);
     // Check AtCapacity() condition enforced in MarkNeedsDeepCopy() and
     // MarkFlushResources().
-    DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
+    DCHECK((!needs_deep_copy_ && flush_mode_ == FlushMode::NO_FLUSH_RESOURCES)
         || num_rows_ == capacity_);
     int64_t mem_usage = attached_buffer_bytes_ + tuple_data_pool_.total_allocated_bytes();
     return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
@@ -293,9 +293,11 @@ class RowBatch {
   void MarkFlushResources() {
     DCHECK_LE(num_rows_, capacity_);
     capacity_ = num_rows_;
-    flush_ = FlushMode::FLUSH_RESOURCES;
+    flush_mode_ = FlushMode::FLUSH_RESOURCES;
   }
 
+  FlushMode flush_mode() const { return flush_mode_; }
+
   /// Called to indicate that some resources backing this batch were not attached and
   /// will be cleaned up after the next GetNext() call. This means that the batch must
   /// be returned up the operator tree. Blocking operators must deep-copy any rows from
@@ -499,10 +501,10 @@ class RowBatch {
   /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
   /// acquired by a new owner as soon as possible. See MarkFlushResources(). If
   /// FLUSH_RESOURCES, AtCapacity() is also true.
-  FlushMode flush_;
+  FlushMode flush_mode_;
 
   /// If true, this batch references unowned memory that will be cleaned up soon.
-  /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
+  /// See MarkNeedsDeepCopy(). If true, 'flush_mode_' is FLUSH_RESOURCES and
   /// AtCapacity() is true.
   bool needs_deep_copy_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 8f5ede9..64feeb7 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -65,9 +65,9 @@ class SortedRunMerger::SortedRunWrapper {
     do {
       // Make sure to transfer resources from every batch received from 'sorted_run_'.
       if (transfer_batch != NULL) {
-        DCHECK(!input_row_batch_->needs_deep_copy())
-            << "Run batch suppliers that set the "
-               "needs_deep_copy flag must use a deep-copying merger";
+        DCHECK_ENUM_EQ(
+            RowBatch::FlushMode::NO_FLUSH_RESOURCES, input_row_batch_->flush_mode())
+            << "Run batch suppliers that flush resources must use a deep-copying merger";
         input_row_batch_->TransferResourceOwnership(transfer_batch);
       }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h
index 99ffbe9..cfa07cb 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -43,10 +43,11 @@ class RuntimeProfile;
 /// and transfers resource ownership from the input batches to the output batch when
 /// an input batch is processed.
 ///
-/// SortedRunMerger cannot handle the 'needs_deep_copy' resource-transfer model so
-/// if the RunBatchSupplierFn can return batches with the 'needs_deep_copy' flag set,
-/// the merger must have 'deep_copy_input' set. TODO: once 'needs_deep_copy' is removed
-/// this is no longer a problem.
+/// SortedRunMerger cannot handle "flushing resources" so if the RunBatchSupplierFn
+/// can return batches with FLUSH_RESOURCES set, the merger must have 'deep_copy_input'
+/// set. This is because AdvanceMinRow() gets the next batch before freeing resources
+/// from the previous batch.
+/// TODO: it would be nice to fix this to avoid unnecessary copies.
 class SortedRunMerger {
  public:
   /// Function that returns the next batch of rows from an input sorted run. The batch

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index a8bab16..806a0b4 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -334,8 +334,8 @@ class Sorter::Run {
   Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
 
   /// Advance to the next read page. If the run is pinned, has no effect. If the run
-  /// is unpinned, atomically pin the page at 'page_index' + 1 in 'pages' and delete
-  /// the page at 'page_index'.
+  /// is unpinned, the pin at 'page_index' was already attached to an output batch and
+  /// this function will pin the page at 'page_index' + 1 in 'pages'.
   Status PinNextReadPage(vector<Page>* pages, int page_index) WARN_UNUSED_RESULT;
 
   /// Copy the StringValues in 'var_values' to 'dest' in order and update the StringValue
@@ -918,22 +918,11 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
 
   if (end_of_fixed_len_page_
       && fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) - 1) {
-    if (is_pinned_) {
-      // All pages were previously attached to output batches. GetNextBatch() assumes
-      // that we don't attach resources to the batch on eos.
-      DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
-      DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
-
-      // Flush resources in case we are in a subplan and need to allocate more pages
-      // when the node is reopened.
-      output_batch->MarkFlushResources();
-    } else {
-      // We held onto the last fixed or var len blocks without transferring them to the
-      // caller. We signalled MarkNeedsDeepCopy() to the caller, so we can safely delete
-      // them now to free memory.
-      if (!fixed_len_pages_.empty()) DCHECK_EQ(NumOpenPages(fixed_len_pages_), 1);
-      if (!var_len_pages_.empty()) DCHECK_EQ(NumOpenPages(var_len_pages_), 1);
-    }
+    // All pages were previously attached to output batches. GetNextBatch() assumes
+    // that we don't attach resources to the batch on eos.
+    DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
+    DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
+
     CloseAllPages();
     *eos = true;
     DCHECK_EQ(num_tuples_returned_, num_tuples_);
@@ -976,12 +965,16 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
     if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
       DCHECK(!is_pinned_);
       // The var-len data is in the next page. We are done with the current page, so
-      // return rows we've accumulated so far and advance to the next page in the next
-      // GetNext() call. This is needed for the unpinned case where we will exchange
-      // this page for the next in the next GetNext() call. So therefore we must hold
-      // onto the current var-len page and signal to the caller that the page is going
-      // to be deleted.
-      output_batch->MarkNeedsDeepCopy();
+      // return rows we've accumulated so far along with the page's buffer and advance
+      // to the next page in the next GetNext() call. We need the page's reservation to
+      // pin the next page, so flush resources.
+      DCHECK_GE(var_len_pages_index_, 0);
+      DCHECK_LT(var_len_pages_index_, var_len_pages_.size());
+      BufferPool::BufferHandle buffer =
+          var_len_pages_[var_len_pages_index_].ExtractBuffer(
+              sorter_->buffer_pool_client_);
+      output_batch->AddBuffer(sorter_->buffer_pool_client_, move(buffer),
+          RowBatch::FlushMode::FLUSH_RESOURCES);
       end_of_var_len_page_ = true;
       break;
     }
@@ -993,27 +986,23 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
 
   if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
     // Reached the page boundary, need to move to the next page.
-    if (is_pinned_) {
-      BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
-      // Attach page to batch. Caller can delete the page when it wants to.
-      output_batch->AddBuffer(client,
-          fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client),
-          RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-
-      // Attach the var-len pages at eos once no more rows will reference the pages.
-      if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
-        for (Page& var_len_page : var_len_pages_) {
-          DCHECK(var_len_page.is_open());
+    BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
+    // Attach page to batch. For unpinned pages we need to flush resource so we can pin
+    // the next page on the next call to GetNext().
+    RowBatch::FlushMode flush = is_pinned_ ? RowBatch::FlushMode::NO_FLUSH_RESOURCES :
+                                             RowBatch::FlushMode::FLUSH_RESOURCES;
+    output_batch->AddBuffer(
+        client, fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client), flush);
+    // Attach remaining var-len pages at eos once no more rows will reference the pages.
+    if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
+      for (Page& var_len_page : var_len_pages_) {
+        DCHECK(!is_pinned_ || var_len_page.is_open());
+        if (var_len_page.is_open()) {
           output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
               RowBatch::FlushMode::NO_FLUSH_RESOURCES);
         }
-        var_len_pages_.clear();
       }
-    } else {
-      // To iterate over unpinned runs, we need to exchange this page for the next
-      // in the next GetNext() call, so we need to hold onto the page and signal to
-      // the caller that the page is going to be deleted.
-      output_batch->MarkNeedsDeepCopy();
+      var_len_pages_.clear();
     }
     end_of_fixed_len_page_ = true;
   }
@@ -1027,14 +1016,10 @@ Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
   Page* curr_page = &(*pages)[page_index];
   Page* next_page = &(*pages)[page_index + 1];
   DCHECK_EQ(is_pinned_, next_page->is_pinned());
-  if (is_pinned_) {
-    // The current page was attached to a batch and 'next_page' is already pinned.
-    DCHECK(!curr_page->is_open());
-    return Status::OK();
-  }
-  // Close the previous page to free memory and pin the next page. Should always succeed
-  // since the pages are the same size.
-  curr_page->Close(sorter_->buffer_pool_client_);
+  // The current page was attached to a batch.
+  DCHECK(!curr_page->is_open());
+  // 'next_page' is already pinned if the whole stream is pinned.
+  if (is_pinned_) return Status::OK();
   RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
   return Status::OK();
 }