You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/18 19:39:54 UTC

[2/2] incubator-impala git commit: IMPALA-5520: TopN node periodically reclaims old allocations

IMPALA-5520: TopN node periodically reclaims old allocations

Currently TopN retains old string allocations in a tuple pool which is
held longer than necessary, resulting in unnecessary memory usage.
With this commit, the TopN node will periodically re-materialise the
rows stored in the priority queue and reclaim the old allocations.
This is done when the number of rows removed from the priority queue
is more than twice the N (limit + offset). Moreover, a new counter
called "TuplePoolReclamations" is added to the TopN node that keeps
track of the number of times the tuple pool is reclaimed.

Testing:
Test added to test_queries.py which sets a low mem_limit such
that the test would fail if reclamation is not implemented and pass
otherwise.

Performance:
Query 1 (expected general case):
select * from tpch.lineitem order by l_orderkey desc limit 10;

Query 2 (example worst case: data stored in reverse order before
feeding to the last TopN node):
select * from (select * from tpch.lineitem order by l_orderkey desc
limit 6001215) tb order by l_orderkey limit 10;

                       With Reclaim           Without Reclaim
                   Query 1     Query 2      Query 1     Query 2
MaxTuplePoolMem    3.96 KB     3.43 KB      110.2 MB    708.8 MB
Time (mean)        2s 218ms    6s 391ms     2s 021ms    6s 406ms
Time (stdev)       74.38ms     67.45ms      102.71ms    70.44ms
Reclaims            910         5861          N/A         N/A

We notice that memory footprint is orders of magnitude lower while
maintaining similar query runtimes. Cluster perf testing will be done
later.

Change-Id: I968f57f0ff2905bd581908bc5c5ee486b31e6aa8
Reviewed-on: http://gerrit.cloudera.org:8080/7400
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bc2250f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bc2250f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bc2250f8

Branch: refs/heads/master
Commit: bc2250f84f4cb134ed97edad2be91ec6229b88bc
Parents: 19005e6
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Jul 6 16:48:30 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jul 18 02:50:56 2017 +0000

----------------------------------------------------------------------
 be/src/exec/topn-node-ir.cc      | 15 ++++++----
 be/src/exec/topn-node.cc         | 56 ++++++++++++++++++++++++++++-------
 be/src/exec/topn-node.h          | 36 ++++++++++++++++++----
 tests/query_test/test_queries.py | 35 ++++++++++++++++++++++
 4 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc2250f8/be/src/exec/topn-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node-ir.cc b/be/src/exec/topn-node-ir.cc
index 0acd724..590a4c8 100644
--- a/be/src/exec/topn-node-ir.cc
+++ b/be/src/exec/topn-node-ir.cc
@@ -29,14 +29,14 @@ void TopNNode::InsertBatch(RowBatch* batch) {
 void TopNNode::InsertTupleRow(TupleRow* input_row) {
   Tuple* insert_tuple = nullptr;
 
-  if (priority_queue_->size() < limit_ + offset_) {
+  if (priority_queue_.size() < limit_ + offset_) {
     insert_tuple = reinterpret_cast<Tuple*>(
         tuple_pool_->Allocate(output_tuple_desc_->byte_size()));
     insert_tuple->MaterializeExprs<false, false>(input_row, *output_tuple_desc_,
         output_tuple_expr_evals_, tuple_pool_.get());
   } else {
-    DCHECK(!priority_queue_->empty());
-    Tuple* top_tuple = priority_queue_->top();
+    DCHECK(!priority_queue_.empty());
+    Tuple* top_tuple = priority_queue_.front();
     tmp_tuple_->MaterializeExprs<false, true>(input_row, *output_tuple_desc_,
         output_tuple_expr_evals_, nullptr);
     if (tuple_row_less_than_->Less(tmp_tuple_, top_tuple)) {
@@ -44,9 +44,14 @@ void TopNNode::InsertTupleRow(TupleRow* input_row) {
       // to be fixed to use a freelist
       tmp_tuple_->DeepCopy(top_tuple, *output_tuple_desc_, tuple_pool_.get());
       insert_tuple = top_tuple;
-      priority_queue_->pop();
+      PopHeap(&priority_queue_,
+          ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_));
+      rows_to_reclaim_++;
     }
   }
 
-  if (insert_tuple != nullptr) priority_queue_->push(insert_tuple);
+  if (insert_tuple != nullptr) {
+    PushHeap(&priority_queue_,
+        ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_), insert_tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc2250f8/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 3b38e9d..9b94fe7 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -23,6 +23,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple.h"
@@ -47,8 +48,9 @@ TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     tmp_tuple_(NULL),
     tuple_pool_(NULL),
     codegend_insert_batch_fn_(NULL),
-    num_rows_skipped_(0),
-    priority_queue_(NULL) {
+    rows_to_reclaim_(0),
+    tuple_pool_reclaim_counter_(NULL),
+    num_rows_skipped_(0) {
 }
 
 Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* state) {
@@ -76,12 +78,11 @@ Status TopNNode::Prepare(RuntimeState* state) {
   AddEvaluatorsToFree(output_tuple_expr_evals_);
   tuple_row_less_than_.reset(
       new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
-  priority_queue_.reset(
-      new priority_queue<Tuple*, vector<Tuple*>, ComparatorWrapper<TupleRowComparator>>(
-          *tuple_row_less_than_));
   output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
   AddCodegenDisabledMessage(state);
+  tuple_pool_reclaim_counter_ = ADD_COUNTER(runtime_profile(), "TuplePoolReclamations",
+      TUnit::UNIT);
   return Status::OK();
 }
 
@@ -162,12 +163,16 @@ Status TopNNode::Open(RuntimeState* state) {
         } else {
           InsertBatch(&batch);
         }
+        if (rows_to_reclaim_ > 2 * (limit_ + offset_)) {
+          RETURN_IF_ERROR(ReclaimTuplePool(state));
+          COUNTER_ADD(tuple_pool_reclaim_counter_, 1);
+        }
       }
       RETURN_IF_CANCELLED(state);
       RETURN_IF_ERROR(QueryMaintenance(state));
     } while (!eos);
   }
-  DCHECK_LE(priority_queue_->size(), limit_ + offset_);
+  DCHECK_LE(priority_queue_.size(), limit_ + offset_);
   PrepareForOutput();
 
   // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
@@ -208,7 +213,7 @@ Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
 }
 
 Status TopNNode::Reset(RuntimeState* state) {
-  while(!priority_queue_->empty()) priority_queue_->pop();
+  priority_queue_.clear();
   num_rows_skipped_ = 0;
   // We deliberately do not free the tuple_pool_ here to allow selective transferring
   // of resources in the future.
@@ -232,12 +237,13 @@ Status TopNNode::QueryMaintenance(RuntimeState* state) {
 
 // Reverse the order of the tuples in the priority queue
 void TopNNode::PrepareForOutput() {
-  sorted_top_n_.resize(priority_queue_->size());
+  sorted_top_n_.resize(priority_queue_.size());
   int64_t index = sorted_top_n_.size() - 1;
 
-  while (priority_queue_->size() > 0) {
-    Tuple* tuple = priority_queue_->top();
-    priority_queue_->pop();
+  while (priority_queue_.size() > 0) {
+    Tuple* tuple = priority_queue_.front();
+    PopHeap(&priority_queue_,
+        ComparatorWrapper<TupleRowComparator>(*tuple_row_less_than_));
     sorted_top_n_[index] = tuple;
     --index;
   }
@@ -245,6 +251,34 @@ void TopNNode::PrepareForOutput() {
   get_next_iter_ = sorted_top_n_.begin();
 }
 
+Status TopNNode::ReclaimTuplePool(RuntimeState* state) {
+  unique_ptr<MemPool> temp_pool(new MemPool(mem_tracker()));
+
+  for (int i = 0; i < priority_queue_.size(); i++) {
+    Tuple* insert_tuple = reinterpret_cast<Tuple*>(temp_pool->TryAllocate(
+        output_tuple_desc_->byte_size()));
+    if (UNLIKELY(insert_tuple == nullptr)) {
+      return temp_pool->mem_tracker()->MemLimitExceeded(state,
+          "Failed to allocate memory in TopNNode::ReclaimTuplePool.",
+          output_tuple_desc_->byte_size());
+    }
+    priority_queue_[i]->DeepCopy(insert_tuple, *output_tuple_desc_, temp_pool.get());
+    priority_queue_[i] = insert_tuple;
+  }
+
+  rows_to_reclaim_ = 0;
+  tmp_tuple_ = reinterpret_cast<Tuple*>(temp_pool->TryAllocate(
+      output_tuple_desc_->byte_size()));
+  if (UNLIKELY(tmp_tuple_ == nullptr)) {
+    return temp_pool->mem_tracker()->MemLimitExceeded(state,
+        "Failed to allocate memory in TopNNode::ReclaimTuplePool.",
+        output_tuple_desc_->byte_size());
+  }
+  tuple_pool_->FreeAll();
+  tuple_pool_.reset(temp_pool.release());
+  return Status::OK();
+}
+
 void TopNNode::DebugString(int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
   *out << "TopNNode("

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc2250f8/be/src/exec/topn-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index d7daacb..b3cf1c0 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -68,6 +68,24 @@ class TopNNode : public ExecNode {
   /// Flatten and reverse the priority queue.
   void PrepareForOutput();
 
+  // Re-materialize the elements in the priority queue into a new tuple pool, and release
+  // the previous pool.
+  Status ReclaimTuplePool(RuntimeState* state);
+
+  /// Helper methods for modifying priority_queue while maintaining ordered heap
+  /// invariants
+  inline static void PushHeap(std::vector<Tuple*>* priority_queue,
+      const ComparatorWrapper<TupleRowComparator>& comparator, Tuple* const insert_row) {
+    priority_queue->push_back(insert_row);
+    std::push_heap(priority_queue->begin(), priority_queue->end(), comparator);
+  }
+
+  inline static void PopHeap(std::vector<Tuple*>* priority_queue,
+      const ComparatorWrapper<TupleRowComparator>& comparator) {
+    std::pop_heap(priority_queue->begin(), priority_queue->end(), comparator);
+    priority_queue->pop_back();
+  }
+
   /// Number of rows to skip.
   int64_t offset_;
 
@@ -107,18 +125,24 @@ class TopNNode : public ExecNode {
   /// Timer for time spent in InsertBatch() function (or codegen'd version)
   RuntimeProfile::Counter* insert_batch_timer_;
 
+  /// Number of rows to be reclaimed since tuple_pool_ was last created/reclaimed
+  int64_t rows_to_reclaim_;
+
+  /// Number of times tuple pool memory was reclaimed
+  RuntimeProfile::Counter* tuple_pool_reclaim_counter_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
   /// Number of rows skipped. Used for adhering to offset_.
   int64_t num_rows_skipped_;
 
-  /// The priority queue will never have more elements in it than the LIMIT + OFFSET.
-  /// The stl priority queue doesn't support a max size, so to get that functionality,
-  /// the order of the queue is the opposite of what the ORDER BY clause specifies, such
-  /// that the top of the queue is the last sorted element.
-  boost::scoped_ptr<std::priority_queue<Tuple*, std::vector<Tuple*>,
-      ComparatorWrapper<TupleRowComparator>>> priority_queue_;
+  /// The priority queue (represented by a vector and modified using
+  /// push_heap()/pop_heap() to maintain ordered heap invariants) will never have more
+  /// elements in it than the LIMIT + OFFSET. The order of the queue is the opposite of
+  /// what the ORDER BY clause specifies, such that the top of the queue is the last
+  /// sorted element.
+  std::vector<Tuple*> priority_queue_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc2250f8/tests/query_test/test_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 40ca7f3..04d70fc 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -19,6 +19,7 @@
 
 import copy
 import pytest
+import re
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_uncompressed_text_dimension
@@ -217,3 +218,37 @@ class TestHdfsQueries(TestQueries):
 
   def test_file_partitions(self, vector):
     self.run_test_case('QueryTest/hdfs-partitions', vector)
+
+class TestTopNReclaimQuery(ImpalaTestSuite):
+  """Test class to validate that TopN periodically reclaims tuple pool memory
+   and runs with a lower memory footprint."""
+  QUERY = "select * from tpch.lineitem order by l_orderkey desc limit 10;"
+
+  # Mem limit empirically selected so that the query fails if tuple pool reclamation
+  # is not implemented for TopN
+  MEM_LIMIT = "50m"
+
+  @classmethod
+  def get_workload(self):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTopNReclaimQuery, cls).add_test_dimensions()
+    # The tpch tests take a long time to execute so restrict the combinations they
+    # execute over.
+    cls.ImpalaTestMatrix.add_dimension(
+      create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_top_n_reclaim(self, vector):
+    exec_options = vector.get_value('exec_option')
+    exec_options['mem_limit'] = self.MEM_LIMIT
+    result = self.execute_query(self.QUERY, exec_options)
+    runtime_profile = str(result.runtime_profile)
+    num_of_times_tuple_pool_reclaimed = re.findall(
+      'TuplePoolReclamations: ([0-9]*)', runtime_profile)
+    # Confirm newly added counter is visible
+    assert len(num_of_times_tuple_pool_reclaimed) > 0
+    # Tuple pool is expected to be reclaimed for this query
+    for n in num_of_times_tuple_pool_reclaimed:
+      assert int(n) > 0