You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/27 22:40:29 UTC
[2/2] impala git commit: IMPALA-7324: remove MarkNeedsDeepCopy() from
sorter
IMPALA-7324: remove MarkNeedsDeepCopy() from sorter
Testing:
Ran exhaustive tests.
Change-Id: If86f038f1f6ca81ad5c9d40af1b7ea6115144ffc
Reviewed-on: http://gerrit.cloudera.org:8080/11048
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b5608264
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b5608264
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b5608264
Branch: refs/heads/master
Commit: b5608264b4552e44eb73ded1e232a8775c3dba6b
Parents: 154c530
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 23 23:29:04 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 27 22:34:47 2018 +0000
----------------------------------------------------------------------
be/src/runtime/row-batch.cc | 10 ++--
be/src/runtime/row-batch.h | 10 ++--
be/src/runtime/sorted-run-merger.cc | 6 +--
be/src/runtime/sorted-run-merger.h | 9 ++--
be/src/runtime/sorter.cc | 83 +++++++++++++-------------------
5 files changed, 53 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index ca2a9f3..b5c51bb 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -44,7 +44,7 @@ const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_tracker)
: num_rows_(0),
capacity_(capacity),
- flush_(FlushMode::NO_FLUSH_RESOURCES),
+ flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(row_desc->tuple_descriptors().size()),
tuple_ptrs_size_(capacity * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -71,7 +71,7 @@ RowBatch::RowBatch(
const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
: num_rows_(input_batch.num_rows),
capacity_(input_batch.num_rows),
- flush_(FlushMode::NO_FLUSH_RESOURCES),
+ flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(input_batch.row_tuples.size()),
tuple_ptrs_size_(capacity_ * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -108,7 +108,7 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header
MemTracker* mem_tracker)
: num_rows_(0),
capacity_(0),
- flush_(FlushMode::NO_FLUSH_RESOURCES),
+ flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(header.num_tuples_per_row()),
tuple_ptrs_size_(header.num_rows() * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -436,7 +436,7 @@ void RowBatch::Reset() {
tuple_data_pool_.FreeAll();
FreeBuffers();
attached_buffer_bytes_ = 0;
- flush_ = FlushMode::NO_FLUSH_RESOURCES;
+ flush_mode_ = FlushMode::NO_FLUSH_RESOURCES;
needs_deep_copy_ = false;
}
@@ -449,7 +449,7 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
buffers_.clear();
if (needs_deep_copy_) {
dest->MarkNeedsDeepCopy();
- } else if (flush_ == FlushMode::FLUSH_RESOURCES) {
+ } else if (flush_mode_ == FlushMode::FLUSH_RESOURCES) {
dest->MarkFlushResources();
}
Reset();
http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 08df8a8..67adb9b 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -198,7 +198,7 @@ class RowBatch {
DCHECK_LE(num_rows_, capacity_);
// Check AtCapacity() condition enforced in MarkNeedsDeepCopy() and
// MarkFlushResources().
- DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
+ DCHECK((!needs_deep_copy_ && flush_mode_ == FlushMode::NO_FLUSH_RESOURCES)
|| num_rows_ == capacity_);
int64_t mem_usage = attached_buffer_bytes_ + tuple_data_pool_.total_allocated_bytes();
return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
@@ -293,9 +293,11 @@ class RowBatch {
void MarkFlushResources() {
DCHECK_LE(num_rows_, capacity_);
capacity_ = num_rows_;
- flush_ = FlushMode::FLUSH_RESOURCES;
+ flush_mode_ = FlushMode::FLUSH_RESOURCES;
}
+ FlushMode flush_mode() const { return flush_mode_; }
+
/// Called to indicate that some resources backing this batch were not attached and
/// will be cleaned up after the next GetNext() call. This means that the batch must
/// be returned up the operator tree. Blocking operators must deep-copy any rows from
@@ -499,10 +501,10 @@ class RowBatch {
/// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
/// acquired by a new owner as soon as possible. See MarkFlushResources(). If
/// FLUSH_RESOURCES, AtCapacity() is also true.
- FlushMode flush_;
+ FlushMode flush_mode_;
/// If true, this batch references unowned memory that will be cleaned up soon.
- /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
+ /// See MarkNeedsDeepCopy(). If true, 'flush_mode_' is FLUSH_RESOURCES and
/// AtCapacity() is true.
bool needs_deep_copy_;
http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 8f5ede9..64feeb7 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -65,9 +65,9 @@ class SortedRunMerger::SortedRunWrapper {
do {
// Make sure to transfer resources from every batch received from 'sorted_run_'.
if (transfer_batch != NULL) {
- DCHECK(!input_row_batch_->needs_deep_copy())
- << "Run batch suppliers that set the "
- "needs_deep_copy flag must use a deep-copying merger";
+ DCHECK_ENUM_EQ(
+ RowBatch::FlushMode::NO_FLUSH_RESOURCES, input_row_batch_->flush_mode())
+ << "Run batch suppliers that flush resources must use a deep-copying merger";
input_row_batch_->TransferResourceOwnership(transfer_batch);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h
index 99ffbe9..cfa07cb 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -43,10 +43,11 @@ class RuntimeProfile;
/// and transfers resource ownership from the input batches to the output batch when
/// an input batch is processed.
///
-/// SortedRunMerger cannot handle the 'needs_deep_copy' resource-transfer model so
-/// if the RunBatchSupplierFn can return batches with the 'needs_deep_copy' flag set,
-/// the merger must have 'deep_copy_input' set. TODO: once 'needs_deep_copy' is removed
-/// this is no longer a problem.
+/// SortedRunMerger cannot handle "flushing resources" so if the RunBatchSupplierFn
+/// can return batches with FLUSH_RESOURCES set, the merger must have 'deep_copy_input'
+/// set. This is because AdvanceMinRow() gets the next batch before freeing resources
+/// from the previous batch.
+/// TODO: it would be nice to fix this to avoid unnecessary copies.
class SortedRunMerger {
public:
/// Function that returns the next batch of rows from an input sorted run. The batch
http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index a8bab16..806a0b4 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -334,8 +334,8 @@ class Sorter::Run {
Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
/// Advance to the next read page. If the run is pinned, has no effect. If the run
- /// is unpinned, atomically pin the page at 'page_index' + 1 in 'pages' and delete
- /// the page at 'page_index'.
+ /// is unpinned, the pin at 'page_index' was already attached to an output batch and
+ /// this function will pin the page at 'page_index' + 1 in 'pages'.
Status PinNextReadPage(vector<Page>* pages, int page_index) WARN_UNUSED_RESULT;
/// Copy the StringValues in 'var_values' to 'dest' in order and update the StringValue
@@ -918,22 +918,11 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
if (end_of_fixed_len_page_
&& fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) - 1) {
- if (is_pinned_) {
- // All pages were previously attached to output batches. GetNextBatch() assumes
- // that we don't attach resources to the batch on eos.
- DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
- DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
-
- // Flush resources in case we are in a subplan and need to allocate more pages
- // when the node is reopened.
- output_batch->MarkFlushResources();
- } else {
- // We held onto the last fixed or var len blocks without transferring them to the
- // caller. We signalled MarkNeedsDeepCopy() to the caller, so we can safely delete
- // them now to free memory.
- if (!fixed_len_pages_.empty()) DCHECK_EQ(NumOpenPages(fixed_len_pages_), 1);
- if (!var_len_pages_.empty()) DCHECK_EQ(NumOpenPages(var_len_pages_), 1);
- }
+ // All pages were previously attached to output batches. GetNextBatch() assumes
+ // that we don't attach resources to the batch on eos.
+ DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
+ DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
+
CloseAllPages();
*eos = true;
DCHECK_EQ(num_tuples_returned_, num_tuples_);
@@ -976,12 +965,16 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
DCHECK(!is_pinned_);
// The var-len data is in the next page. We are done with the current page, so
- // return rows we've accumulated so far and advance to the next page in the next
- // GetNext() call. This is needed for the unpinned case where we will exchange
- // this page for the next in the next GetNext() call. So therefore we must hold
- // onto the current var-len page and signal to the caller that the page is going
- // to be deleted.
- output_batch->MarkNeedsDeepCopy();
+ // return rows we've accumulated so far along with the page's buffer and advance
+ // to the next page in the next GetNext() call. We need the page's reservation to
+ // pin the next page, so flush resources.
+ DCHECK_GE(var_len_pages_index_, 0);
+ DCHECK_LT(var_len_pages_index_, var_len_pages_.size());
+ BufferPool::BufferHandle buffer =
+ var_len_pages_[var_len_pages_index_].ExtractBuffer(
+ sorter_->buffer_pool_client_);
+ output_batch->AddBuffer(sorter_->buffer_pool_client_, move(buffer),
+ RowBatch::FlushMode::FLUSH_RESOURCES);
end_of_var_len_page_ = true;
break;
}
@@ -993,27 +986,23 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
// Reached the page boundary, need to move to the next page.
- if (is_pinned_) {
- BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
- // Attach page to batch. Caller can delete the page when it wants to.
- output_batch->AddBuffer(client,
- fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client),
- RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-
- // Attach the var-len pages at eos once no more rows will reference the pages.
- if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
- for (Page& var_len_page : var_len_pages_) {
- DCHECK(var_len_page.is_open());
+ BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
+ // Attach page to batch. For unpinned pages we need to flush resource so we can pin
+ // the next page on the next call to GetNext().
+ RowBatch::FlushMode flush = is_pinned_ ? RowBatch::FlushMode::NO_FLUSH_RESOURCES :
+ RowBatch::FlushMode::FLUSH_RESOURCES;
+ output_batch->AddBuffer(
+ client, fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client), flush);
+ // Attach remaining var-len pages at eos once no more rows will reference the pages.
+ if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
+ for (Page& var_len_page : var_len_pages_) {
+ DCHECK(!is_pinned_ || var_len_page.is_open());
+ if (var_len_page.is_open()) {
output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
- var_len_pages_.clear();
}
- } else {
- // To iterate over unpinned runs, we need to exchange this page for the next
- // in the next GetNext() call, so we need to hold onto the page and signal to
- // the caller that the page is going to be deleted.
- output_batch->MarkNeedsDeepCopy();
+ var_len_pages_.clear();
}
end_of_fixed_len_page_ = true;
}
@@ -1027,14 +1016,10 @@ Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
Page* curr_page = &(*pages)[page_index];
Page* next_page = &(*pages)[page_index + 1];
DCHECK_EQ(is_pinned_, next_page->is_pinned());
- if (is_pinned_) {
- // The current page was attached to a batch and 'next_page' is already pinned.
- DCHECK(!curr_page->is_open());
- return Status::OK();
- }
- // Close the previous page to free memory and pin the next page. Should always succeed
- // since the pages are the same size.
- curr_page->Close(sorter_->buffer_pool_client_);
+ // The current page was attached to a batch.
+ DCHECK(!curr_page->is_open());
+ // 'next_page' is already pinned if the whole stream is pinned.
+ if (is_pinned_) return Status::OK();
RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
return Status::OK();
}