You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/23 15:40:28 UTC
[02/17] incubator-impala git commit: IMPALA-3332: Free local
allocations in sorter.
IMPALA-3332: Free local allocations in sorter.
Sorter can have runaway memory consumption as it never frees
local allocations made in comparator_.Less(). In addition, it
doesn't check for errors generated during expression evaluation
so it may keep sorting even after failures have occurred.
This change fixes the problem by freeing local allocations for
every n invocations of comparator_.Less() where n is the row
batch size specified in the query options. Various error checks
are also added to return early if any error is encountered.
Change-Id: I941729b4836e5dbb827d4313a0b45bc5df2fa8e1
Reviewed-on: http://gerrit.cloudera.org:8080/3116
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f7501d2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f7501d2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f7501d2e
Branch: refs/heads/master
Commit: f7501d2ec18feb2fd8e12cd7de3b9b9726085b54
Parents: 38416ee
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed May 18 10:56:52 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon May 23 08:40:18 2016 -0700
----------------------------------------------------------------------
be/src/runtime/sorted-run-merger.cc | 2 +
be/src/runtime/sorter.cc | 117 ++++++++++++-------
be/src/util/tuple-row-compare.h | 16 ++-
.../queries/QueryTest/spilling.test | 8 +-
4 files changed, 91 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index ea7a689..0376437 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -180,6 +180,8 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) {
Heapify(0);
}
+ // Free local allocations made by comparator_.Less();
+ comparator_.FreeLocalAllocations();
*eos = min_heap_.empty();
return Status::OK();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 96c27df..e0d388a 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -227,10 +227,9 @@ class Sorter::TupleSorter {
~TupleSorter();
/// Performs a quicksort for tuples in 'run' followed by an insertion sort to
- /// finish smaller blocks.
- /// Returns early if stste_->is_cancelled() is true. No status
- /// is returned - the caller must check for cancellation.
- void Sort(Run* run);
+ /// finish smaller blocks. Returns an error status if any error is encountered or
+ /// if the query is cancelled.
+ Status Sort(Run* run);
private:
static const int INSERTION_THRESHOLD = 16;
@@ -266,6 +265,12 @@ class Sorter::TupleSorter {
current_tuple_ = buffer_start_ + block_offset + past_end_bytes;
}
+ /// Default constructor used for local variable.
+ TupleIterator()
+ : parent_(NULL),
+ index_(-1),
+ current_tuple_(NULL) { }
+
/// Sets 'current_tuple_' to point to the next tuple in the run. Increments 'block_index_'
/// and advances to the next block if the next tuple is in the next block.
/// Can be advanced one past the last tuple in the run, but is not valid to
@@ -327,6 +332,10 @@ class Sorter::TupleSorter {
/// Tuple comparator with method Less() that returns true if lhs < rhs.
const TupleRowComparator comparator_;
+ /// Number of times comparator_.Less() can be invoked again before
+ /// comparator_.FreeLocalAllocations() needs to be called.
+ int num_comparisons_till_free_;
+
/// Runtime state instance to check for cancellation. Not owned.
RuntimeState* const state_;
@@ -344,20 +353,27 @@ class Sorter::TupleSorter {
/// high: Mersenne Twister should be more than adequate.
mt19937_64 rng_;
- /// Perform an insertion sort for rows in the range [first, last) in a run.
- void InsertionSort(const TupleIterator& first, const TupleIterator& last);
+ /// Wrapper around comparator_.Less(). Also call comparator_.FreeLocalAllocations()
+ /// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true
+ /// if 'lhs' is less than 'rhs'.
+ bool Less(TupleRow* lhs, TupleRow* rhs);
+
+ /// Performs an insertion sort for rows in the range [first, last) in a run.
+ /// Returns an error status if there is any error or if the query is cancelled.
+ Status InsertionSort(const TupleIterator& first, const TupleIterator& last);
/// Partitions the sequence of tuples in the range [first, last) in a run into two
/// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and
/// tuples in the second group are >= pivot. Tuples are swapped in place to create the
- /// groups and the index to the first element in the second group is returned.
- /// Checks state_->is_cancelled() and returns early with an invalid result if true.
- TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple* pivot);
+ /// groups and the index to the first element in the second group is returned in 'cut'.
+ /// Return an error status if any error is encountered or if the query is cancelled.
+ Status Partition(TupleIterator first, TupleIterator last, Tuple* pivot,
+ TupleIterator* cut);
/// Performs a quicksort of rows in the range [first, last) followed by insertion sort
- /// for smaller groups of elements.
- /// Checks state_->is_cancelled() and returns early if true.
- void SortHelper(TupleIterator first, TupleIterator last);
+ /// for smaller groups of elements. Return an error status for any errors or if the
+ /// query is cancelled.
+ Status SortHelper(TupleIterator first, TupleIterator last);
/// Select a pivot to partition [first, last).
Tuple* SelectPivot(TupleIterator first, TupleIterator last);
@@ -582,7 +598,7 @@ Status Sorter::Run::UnpinAllBlocks() {
cur_sorted_var_len_block = sorted_var_len_blocks.back();
}
uint8_t* var_data_ptr =
- cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
+ cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
DCHECK_EQ(sorted_var_len_blocks.back(), cur_sorted_var_len_block);
CopyVarLenDataConvertOffset(string_values, sorted_var_len_blocks.size() - 1,
reinterpret_cast<uint8_t*>(cur_sorted_var_len_block->buffer()), var_data_ptr);
@@ -863,6 +879,7 @@ Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_s
block_capacity_(block_size / tuple_size),
last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
comparator_(comp),
+ num_comparisons_till_free_(state->batch_size()),
state_(state) {
temp_tuple_buffer_ = new uint8_t[tuple_size];
temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_);
@@ -874,10 +891,22 @@ Sorter::TupleSorter::~TupleSorter() {
delete[] swap_buffer_;
}
-void Sorter::TupleSorter::Sort(Run* run) {
+Status Sorter::TupleSorter::Sort(Run* run) {
run_ = run;
- SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_));
+ RETURN_IF_ERROR(
+ SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_)));
run->is_sorted_ = true;
+ return Status::OK();
+}
+
+bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) {
+ --num_comparisons_till_free_;
+ DCHECK_GE(num_comparisons_till_free_, 0);
+ if (UNLIKELY(num_comparisons_till_free_ == 0)) {
+ comparator_.FreeLocalAllocations();
+ num_comparisons_till_free_ = state_->batch_size();
+ }
+ return comparator_.Less(lhs, rhs);
}
// Sort the sequence of tuples from [first, last).
@@ -886,7 +915,7 @@ void Sorter::TupleSorter::Sort(Run* run) {
// the sorted sequence by comparing it to each element of the sorted sequence
// (reverse order) to find its correct place in the sorted sequence, copying tuples
// along the way.
-void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
+Status Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
const TupleIterator& last) {
TupleIterator insert_iter = first;
insert_iter.Next();
@@ -902,8 +931,7 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
TupleIterator iter = insert_iter;
iter.Prev();
uint8_t* copy_to = insert_iter.current_tuple_;
- while (comparator_.Less(
- temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
+ while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
memcpy(copy_to, iter.current_tuple_, tuple_size_);
copy_to = iter.current_tuple_;
// Break if 'iter' has reached the first row, meaning that temp_tuple_row_
@@ -914,22 +942,23 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
memcpy(copy_to, temp_tuple_buffer_, tuple_size_);
}
+ RETURN_IF_CANCELLED(state_);
+ RETURN_IF_ERROR(state_->GetQueryStatus());
+ return Status::OK();
}
-Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator first,
- TupleIterator last, Tuple* pivot) {
+Status Sorter::TupleSorter::Partition(TupleIterator first, TupleIterator last,
+ Tuple* pivot, Sorter::TupleSorter::TupleIterator* cut) {
// Copy pivot into temp_tuple since it points to a tuple within [first, last).
memcpy(temp_tuple_buffer_, pivot, tuple_size_);
last.Prev();
while (true) {
// Search for the first and last out-of-place elements, and swap them.
- while (comparator_.Less(
- reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) {
+ while (Less(reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) {
first.Next();
}
- while (comparator_.Less(
- temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
+ while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
last.Prev();
}
@@ -939,38 +968,43 @@ Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator
first.Next();
last.Prev();
+
+ RETURN_IF_CANCELLED(state_);
+ RETURN_IF_ERROR(state_->GetQueryStatus());
}
- return first;
+ *cut = first;
+ return Status::OK();
}
-void Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) {
- if (UNLIKELY(state_->is_cancelled())) return;
+Status Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) {
// Use insertion sort for smaller sequences.
while (last.index_ - first.index_ > INSERTION_THRESHOLD) {
+ // Select a pivot and call Partition() to split the tuples in [first, last) into two
+ // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the first tuple in
+ // the second group.
Tuple* pivot = SelectPivot(first, last);
-
- // Partition() splits the tuples in [first, last) into two groups (<= pivot
- // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group.
- TupleIterator cut = Partition(first, last, pivot);
+ TupleIterator cut;
+ RETURN_IF_ERROR(Partition(first, last, pivot, &cut));
// Recurse on the smaller partition. This limits stack size to log(n) stack frames.
if (cut.index_ - first.index_ < last.index_ - cut.index_) {
// Left partition is smaller.
- SortHelper(first, cut);
+ RETURN_IF_ERROR(SortHelper(first, cut));
first = cut;
} else {
// Right partition is equal or smaller.
- SortHelper(cut, last);
+ RETURN_IF_ERROR(SortHelper(cut, last));
last = cut;
}
- SortHelper(cut, last);
+ RETURN_IF_ERROR(SortHelper(cut, last));
last = cut;
- if (UNLIKELY(state_->is_cancelled())) return;
- }
- InsertionSort(first, last);
+ RETURN_IF_CANCELLED(state_);
+ }
+ RETURN_IF_ERROR(InsertionSort(first, last));
+ return Status::OK();
}
Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator last) {
@@ -1002,9 +1036,9 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) {
TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
- bool t1_lt_t2 = comparator_.Less(tr1, tr2);
- bool t2_lt_t3 = comparator_.Less(tr2, tr3);
- bool t1_lt_t3 = comparator_.Less(tr1, tr3);
+ bool t1_lt_t2 = Less(tr1, tr2);
+ bool t2_lt_t3 = Less(tr2, tr3);
+ bool t1_lt_t3 = Less(tr1, tr3);
if (t1_lt_t2) {
// t1 < t2
@@ -1222,8 +1256,7 @@ Status Sorter::SortRun() {
}
{
SCOPED_TIMER(in_mem_sort_timer_);
- in_mem_tuple_sorter_->Sort(unsorted_run_);
- RETURN_IF_CANCELLED(state_);
+ RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
}
sorted_runs_.push_back(unsorted_run_);
unsorted_run_ = NULL;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 41035df..8505f19 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -102,7 +102,7 @@ class TupleRowComparator {
if (lhs_value != NULL && rhs_value == NULL) return -nulls_first_[i];
int result = RawValue::Compare(lhs_value, rhs_value,
- key_expr_ctxs_lhs_[i]->root()->type());
+ key_expr_ctxs_lhs_[i]->root()->type());
if (!is_asc_[i]) result = -result;
if (result != 0) return result;
// Otherwise, try the next Expr
@@ -126,7 +126,17 @@ class TupleRowComparator {
return Less(lhs_row, rhs_row);
}
+ /// Free any local allocations made during expression evaluations in Compare().
+ void FreeLocalAllocations() const {
+ ExprContext::FreeLocalAllocations(key_expr_ctxs_lhs_);
+ ExprContext::FreeLocalAllocations(key_expr_ctxs_rhs_);
+ }
+
private:
+ /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
+ /// TODO: have codegen'd users inline this instead of calling through the () operator
+ Status CodegenCompare(RuntimeState* state, llvm::Function** fn);
+
const std::vector<ExprContext*>& key_expr_ctxs_lhs_;
const std::vector<ExprContext*>& key_expr_ctxs_rhs_;
std::vector<bool> is_asc_;
@@ -142,10 +152,6 @@ class TupleRowComparator {
typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, TupleRow*,
TupleRow*);
CompareFn* codegend_compare_fn_;
-
- /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
- /// TODO: have codegen'd users inline this instead of calling through the () operator
- Status CodegenCompare(RuntimeState* state, llvm::Function** fn);
};
/// Compares the equality of two Tuples, going slot by slot.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index a29c6c7..c35ddec 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -393,11 +393,10 @@ row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
---- QUERY
# Test sort with inlined char column materialized by exprs.
# Set low memory limit to force spilling.
+# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
set num_nodes=0;
set max_block_mgr_memory=4m;
-# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
-# When IMPALA-3332 is fixed, can reenable this memory limit.
-#set mem_limit=200m;
+set mem_limit=200m;
set disable_outermost_topn=1;
select cast(l_comment as char(50))
from lineitem
@@ -513,8 +512,7 @@ row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
set num_nodes=0;
set max_block_mgr_memory=4m;
# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
-# When IMPALA-3332 is fixed, can reenable this memory limit.
-#set mem_limit=200m;
+set mem_limit=200m;
set disable_outermost_topn=1;
select cast(l_comment as varchar(50))
from lineitem