You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/23 15:40:28 UTC

[02/17] incubator-impala git commit: IMPALA-3332: Free local allocations in sorter.

IMPALA-3332: Free local allocations in sorter.

Sorter can have runaway memory consumption as it never frees
local allocations made in comparator_.Less(). In addition, it
doesn't check for errors generated during expression evaluation
so it may keep sorting even after failures have occurred.

This change fixes the problem by freeing local allocations for
every n invocations of comparator_.Less() where n is the row
batch size specified in the query options. Various error checks
are also added to return early if any error is encountered.

Change-Id: I941729b4836e5dbb827d4313a0b45bc5df2fa8e1
Reviewed-on: http://gerrit.cloudera.org:8080/3116
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f7501d2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f7501d2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f7501d2e

Branch: refs/heads/master
Commit: f7501d2ec18feb2fd8e12cd7de3b9b9726085b54
Parents: 38416ee
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed May 18 10:56:52 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon May 23 08:40:18 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/sorted-run-merger.cc             |   2 +
 be/src/runtime/sorter.cc                        | 117 ++++++++++++-------
 be/src/util/tuple-row-compare.h                 |  16 ++-
 .../queries/QueryTest/spilling.test             |   8 +-
 4 files changed, 91 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index ea7a689..0376437 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -180,6 +180,8 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) {
 
     Heapify(0);
   }
+  // Free local allocations made by comparator_.Less();
+  comparator_.FreeLocalAllocations();
 
   *eos = min_heap_.empty();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 96c27df..e0d388a 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -227,10 +227,9 @@ class Sorter::TupleSorter {
   ~TupleSorter();
 
   /// Performs a quicksort for tuples in 'run' followed by an insertion sort to
-  /// finish smaller blocks.
-  /// Returns early if stste_->is_cancelled() is true. No status
-  /// is returned - the caller must check for cancellation.
-  void Sort(Run* run);
+  /// finish smaller blocks. Returns an error status if any error is encountered or
+  /// if the query is cancelled.
+  Status Sort(Run* run);
 
  private:
   static const int INSERTION_THRESHOLD = 16;
@@ -266,6 +265,12 @@ class Sorter::TupleSorter {
       current_tuple_ = buffer_start_ + block_offset + past_end_bytes;
     }
 
+    /// Default constructor used for local variable.
+    TupleIterator()
+      : parent_(NULL),
+        index_(-1),
+        current_tuple_(NULL) { }
+
     /// Sets 'current_tuple_' to point to the next tuple in the run. Increments 'block_index_'
     /// and advances to the next block if the next tuple is in the next block.
     /// Can be advanced one past the last tuple in the run, but is not valid to
@@ -327,6 +332,10 @@ class Sorter::TupleSorter {
   /// Tuple comparator with method Less() that returns true if lhs < rhs.
   const TupleRowComparator comparator_;
 
+  /// Number of times comparator_.Less() can be invoked again before
+  /// comparator_.FreeLocalAllocations() needs to be called.
+  int num_comparisons_till_free_;
+
   /// Runtime state instance to check for cancellation. Not owned.
   RuntimeState* const state_;
 
@@ -344,20 +353,27 @@ class Sorter::TupleSorter {
   /// high: Mersenne Twister should be more than adequate.
   mt19937_64 rng_;
 
-  /// Perform an insertion sort for rows in the range [first, last) in a run.
-  void InsertionSort(const TupleIterator& first, const TupleIterator& last);
+  /// Wrapper around comparator_.Less(). Also call comparator_.FreeLocalAllocations()
+  /// on every 'state_->batch_size()' invocations of comparator_.Less(). Returns true
+  /// if 'lhs' is less than 'rhs'.
+  bool Less(TupleRow* lhs, TupleRow* rhs);
+
+  /// Performs an insertion sort for rows in the range [first, last) in a run.
+  /// Returns an error status if there is any error or if the query is cancelled.
+  Status InsertionSort(const TupleIterator& first, const TupleIterator& last);
 
   /// Partitions the sequence of tuples in the range [first, last) in a run into two
   /// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and
   /// tuples in the second group are >= pivot. Tuples are swapped in place to create the
-  /// groups and the index to the first element in the second group is returned.
-  /// Checks state_->is_cancelled() and returns early with an invalid result if true.
-  TupleIterator Partition(TupleIterator first, TupleIterator last, Tuple* pivot);
+  /// groups and the index to the first element in the second group is returned in 'cut'.
+  /// Return an error status if any error is encountered or if the query is cancelled.
+  Status Partition(TupleIterator first, TupleIterator last, Tuple* pivot,
+      TupleIterator* cut);
 
   /// Performs a quicksort of rows in the range [first, last) followed by insertion sort
-  /// for smaller groups of elements.
-  /// Checks state_->is_cancelled() and returns early if true.
-  void SortHelper(TupleIterator first, TupleIterator last);
+  /// for smaller groups of elements. Return an error status for any errors or if the
+  /// query is cancelled.
+  Status SortHelper(TupleIterator first, TupleIterator last);
 
   /// Select a pivot to partition [first, last).
   Tuple* SelectPivot(TupleIterator first, TupleIterator last);
@@ -582,7 +598,7 @@ Status Sorter::Run::UnpinAllBlocks() {
           cur_sorted_var_len_block = sorted_var_len_blocks.back();
         }
         uint8_t* var_data_ptr =
-          cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
+            cur_sorted_var_len_block->Allocate<uint8_t>(total_var_len);
         DCHECK_EQ(sorted_var_len_blocks.back(), cur_sorted_var_len_block);
         CopyVarLenDataConvertOffset(string_values, sorted_var_len_blocks.size() - 1,
             reinterpret_cast<uint8_t*>(cur_sorted_var_len_block->buffer()), var_data_ptr);
@@ -863,6 +879,7 @@ Sorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_s
     block_capacity_(block_size / tuple_size),
     last_tuple_block_offset_(tuple_size * ((block_size / tuple_size) - 1)),
     comparator_(comp),
+    num_comparisons_till_free_(state->batch_size()),
     state_(state) {
   temp_tuple_buffer_ = new uint8_t[tuple_size];
   temp_tuple_row_ = reinterpret_cast<TupleRow*>(&temp_tuple_buffer_);
@@ -874,10 +891,22 @@ Sorter::TupleSorter::~TupleSorter() {
   delete[] swap_buffer_;
 }
 
-void Sorter::TupleSorter::Sort(Run* run) {
+Status Sorter::TupleSorter::Sort(Run* run) {
   run_ = run;
-  SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_));
+  RETURN_IF_ERROR(
+      SortHelper(TupleIterator(this, 0), TupleIterator(this, run_->num_tuples_)));
   run->is_sorted_ = true;
+  return Status::OK();
+}
+
+bool Sorter::TupleSorter::Less(TupleRow* lhs, TupleRow* rhs) {
+  --num_comparisons_till_free_;
+  DCHECK_GE(num_comparisons_till_free_, 0);
+  if (UNLIKELY(num_comparisons_till_free_ == 0)) {
+    comparator_.FreeLocalAllocations();
+    num_comparisons_till_free_ = state_->batch_size();
+  }
+  return comparator_.Less(lhs, rhs);
 }
 
 // Sort the sequence of tuples from [first, last).
@@ -886,7 +915,7 @@ void Sorter::TupleSorter::Sort(Run* run) {
 // the sorted sequence by comparing it to each element of the sorted sequence
 // (reverse order) to find its correct place in the sorted sequence, copying tuples
 // along the way.
-void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
+Status Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
     const TupleIterator& last) {
   TupleIterator insert_iter = first;
   insert_iter.Next();
@@ -902,8 +931,7 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
     TupleIterator iter = insert_iter;
     iter.Prev();
     uint8_t* copy_to = insert_iter.current_tuple_;
-    while (comparator_.Less(
-        temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
+    while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&iter.current_tuple_))) {
       memcpy(copy_to, iter.current_tuple_, tuple_size_);
       copy_to = iter.current_tuple_;
       // Break if 'iter' has reached the first row, meaning that temp_tuple_row_
@@ -914,22 +942,23 @@ void Sorter::TupleSorter::InsertionSort(const TupleIterator& first,
 
     memcpy(copy_to, temp_tuple_buffer_, tuple_size_);
   }
+  RETURN_IF_CANCELLED(state_);
+  RETURN_IF_ERROR(state_->GetQueryStatus());
+  return Status::OK();
 }
 
-Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator first,
-    TupleIterator last, Tuple* pivot) {
+Status Sorter::TupleSorter::Partition(TupleIterator first, TupleIterator last,
+    Tuple* pivot, Sorter::TupleSorter::TupleIterator* cut) {
   // Copy pivot into temp_tuple since it points to a tuple within [first, last).
   memcpy(temp_tuple_buffer_, pivot, tuple_size_);
 
   last.Prev();
   while (true) {
     // Search for the first and last out-of-place elements, and swap them.
-    while (comparator_.Less(
-        reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) {
+    while (Less(reinterpret_cast<TupleRow*>(&first.current_tuple_), temp_tuple_row_)) {
       first.Next();
     }
-    while (comparator_.Less(
-        temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
+    while (Less(temp_tuple_row_, reinterpret_cast<TupleRow*>(&last.current_tuple_))) {
       last.Prev();
     }
 
@@ -939,38 +968,43 @@ Sorter::TupleSorter::TupleIterator Sorter::TupleSorter::Partition(TupleIterator
 
     first.Next();
     last.Prev();
+
+    RETURN_IF_CANCELLED(state_);
+    RETURN_IF_ERROR(state_->GetQueryStatus());
   }
 
-  return first;
+  *cut = first;
+  return Status::OK();
 }
 
-void Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) {
-  if (UNLIKELY(state_->is_cancelled())) return;
+Status Sorter::TupleSorter::SortHelper(TupleIterator first, TupleIterator last) {
   // Use insertion sort for smaller sequences.
   while (last.index_ - first.index_ > INSERTION_THRESHOLD) {
+    // Select a pivot and call Partition() to split the tuples in [first, last) into two
+    // groups (<= pivot and >= pivot) in-place. 'cut' is the index of the first tuple in
+    // the second group.
     Tuple* pivot = SelectPivot(first, last);
-
-    // Partition() splits the tuples in [first, last) into two groups (<= pivot
-    // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group.
-    TupleIterator cut = Partition(first, last, pivot);
+    TupleIterator cut;
+    RETURN_IF_ERROR(Partition(first, last, pivot, &cut));
 
     // Recurse on the smaller partition. This limits stack size to log(n) stack frames.
     if (cut.index_ - first.index_ < last.index_ - cut.index_) {
       // Left partition is smaller.
-      SortHelper(first, cut);
+      RETURN_IF_ERROR(SortHelper(first, cut));
       first = cut;
     } else {
       // Right partition is equal or smaller.
-      SortHelper(cut, last);
+      RETURN_IF_ERROR(SortHelper(cut, last));
       last = cut;
     }
 
-    SortHelper(cut, last);
+    RETURN_IF_ERROR(SortHelper(cut, last));
     last = cut;
-    if (UNLIKELY(state_->is_cancelled())) return;
-  }
 
-  InsertionSort(first, last);
+    RETURN_IF_CANCELLED(state_);
+  }
+  RETURN_IF_ERROR(InsertionSort(first, last));
+  return Status::OK();
 }
 
 Tuple* Sorter::TupleSorter::SelectPivot(TupleIterator first, TupleIterator last) {
@@ -1002,9 +1036,9 @@ Tuple* Sorter::TupleSorter::MedianOfThree(Tuple* t1, Tuple* t2, Tuple* t3) {
   TupleRow* tr2 = reinterpret_cast<TupleRow*>(&t2);
   TupleRow* tr3 = reinterpret_cast<TupleRow*>(&t3);
 
-  bool t1_lt_t2 = comparator_.Less(tr1, tr2);
-  bool t2_lt_t3 = comparator_.Less(tr2, tr3);
-  bool t1_lt_t3 = comparator_.Less(tr1, tr3);
+  bool t1_lt_t2 = Less(tr1, tr2);
+  bool t2_lt_t3 = Less(tr2, tr3);
+  bool t1_lt_t3 = Less(tr1, tr3);
 
   if (t1_lt_t2) {
     // t1 < t2
@@ -1222,8 +1256,7 @@ Status Sorter::SortRun() {
   }
   {
     SCOPED_TIMER(in_mem_sort_timer_);
-    in_mem_tuple_sorter_->Sort(unsorted_run_);
-    RETURN_IF_CANCELLED(state_);
+    RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
   }
   sorted_runs_.push_back(unsorted_run_);
   unsorted_run_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 41035df..8505f19 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -102,7 +102,7 @@ class TupleRowComparator {
       if (lhs_value != NULL && rhs_value == NULL) return -nulls_first_[i];
 
       int result = RawValue::Compare(lhs_value, rhs_value,
-                                     key_expr_ctxs_lhs_[i]->root()->type());
+          key_expr_ctxs_lhs_[i]->root()->type());
       if (!is_asc_[i]) result = -result;
       if (result != 0) return result;
       // Otherwise, try the next Expr
@@ -126,7 +126,17 @@ class TupleRowComparator {
     return Less(lhs_row, rhs_row);
   }
 
+  /// Free any local allocations made during expression evaluations in Compare().
+  void FreeLocalAllocations() const {
+    ExprContext::FreeLocalAllocations(key_expr_ctxs_lhs_);
+    ExprContext::FreeLocalAllocations(key_expr_ctxs_rhs_);
+  }
+
  private:
+  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
+  /// TODO: have codegen'd users inline this instead of calling through the () operator
+  Status CodegenCompare(RuntimeState* state, llvm::Function** fn);
+
   const std::vector<ExprContext*>& key_expr_ctxs_lhs_;
   const std::vector<ExprContext*>& key_expr_ctxs_rhs_;
   std::vector<bool> is_asc_;
@@ -142,10 +152,6 @@ class TupleRowComparator {
   typedef int (*CompareFn)(ExprContext* const*, ExprContext* const*, TupleRow*,
       TupleRow*);
   CompareFn* codegend_compare_fn_;
-
-  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
-  /// TODO: have codegen'd users inline this instead of calling through the () operator
-  Status CodegenCompare(RuntimeState* state, llvm::Function** fn);
 };
 
 /// Compares the equality of two Tuples, going slot by slot.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7501d2e/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index a29c6c7..c35ddec 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -393,11 +393,10 @@ row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 ---- QUERY
 # Test sort with inlined char column materialized by exprs.
 # Set low memory limit to force spilling.
+# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
 set num_nodes=0;
 set max_block_mgr_memory=4m;
-# IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
-# When IMPALA-3332 is fixed, can reenable this memory limit.
-#set mem_limit=200m;
+set mem_limit=200m;
 set disable_outermost_topn=1;
 select cast(l_comment as char(50))
 from lineitem
@@ -513,8 +512,7 @@ row_regex: .*TotalMergesPerformed: .* \([1-9][0-9]*\)
 set num_nodes=0;
 set max_block_mgr_memory=4m;
 # IMPALA-3332: comparator makes local allocations that cause runaway memory consumption.
-# When IMPALA-3332 is fixed, can reenable this memory limit.
-#set mem_limit=200m;
+set mem_limit=200m;
 set disable_outermost_topn=1;
 select cast(l_comment as varchar(50))
 from lineitem