You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/10/06 00:03:43 UTC

[4/4] incubator-impala git commit: IMPALA-5844: use a MemPool for expr result allocations

IMPALA-5844: use a MemPool for expr result allocations

This is also a step towards IMPALA-2399 (remove QueryMaintenance()).

"local" allocations containing expression results (either intermediate
or final results) have the following properties:
* They are usually small allocations
* They can be made frequently (e.g. every function call)
* They are owned and managed by the Impala runtime
* They are freed in bulk at various points in query execution.

A MemPool (i.e. bump allocator) is the right mechanism to manage
allocations with the above properties. Before this patch
FunctionContext's used a FreePool + vector of allocations to emulate the
above behaviour. This patch switches to using a MemPool to bring these
allocations in line with the rest of the codebase.

The steps required to do this conversion.
* Use a MemPool for FunctionContext local allocations.
* Identify appropriate MemPools for all of the local allocations from
  function contexts so that the memory lifetime is correct.
* Various cleanup and documentation of existing MemPools.
* Replaces calls to FreeLocalAllocations() with calls to
  MemPool::Clear()

More involved surgery was required in a few places:
* Made the Sorter own its comparator, exprs and MemPool.
* Remove FunctionContextImpl::ReallocateLocal() and just have
  StringFunctions::Replace() do the doubling itself to avoid
  the need for a special interface. Worst-case this doubles
  the memory requirements for Replace() since n / 2 + n / 4
  + n / 8 + .... bytes of memory could be wasted instead of recycled
  for an n-byte output string.
* Provide a way redirect agg fn Serialize()/Finalize() allocations
  to come directly from the output RowBatch's MemPool. This is
  also potentially applicable to other places where we currently
  copy out strings from local allocations, e.g.
  AnalyticEvalNode::AddResultTuple() and Tuple::MaterializeExprs().
* --stress_free_pool_alloc was changed to instead intercept at the
  FunctionContext layer so that it retains the old behaviour even
  though allocations do not all come from FreePools.

The "local" allocation concept was not exposed directly in udf.h so this
patch also renames them to better reflect that they're used for expr
results.

Testing:
* ran exhaustive and ASAN

Change-Id: I4ba5a7542ed90a49a4b5586c040b5985a7d45b61
Reviewed-on: http://gerrit.cloudera.org:8080/8025
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c14a0904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c14a0904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c14a0904

Branch: refs/heads/master
Commit: c14a0904005ce150e15820c8bb9ade94fb58ed56
Parents: d8bdea5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 24 22:51:05 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 00:01:08 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   3 +-
 be/src/common/global-flags.cc                   |   6 +-
 be/src/exec/analytic-eval-node.cc               |  31 ++--
 be/src/exec/analytic-eval-node.h                |   8 -
 be/src/exec/data-sink.cc                        |   8 +-
 be/src/exec/data-sink.h                         |  11 +-
 be/src/exec/exchange-node.cc                    |  10 +-
 be/src/exec/exchange-node.h                     |   1 -
 be/src/exec/exec-node.cc                        |  21 +--
 be/src/exec/exec-node.h                         |  43 ++---
 be/src/exec/filter-context.cc                   |   5 +-
 be/src/exec/filter-context.h                    |   2 +-
 be/src/exec/hash-table-test.cc                  |  24 ++-
 be/src/exec/hash-table.cc                       |  38 ++---
 be/src/exec/hash-table.h                        |  40 +++--
 be/src/exec/hbase-table-sink.cc                 |   2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  30 ++--
 be/src/exec/hdfs-parquet-scanner.h              |   5 -
 be/src/exec/hdfs-scan-node-base.cc              |   4 +-
 be/src/exec/hdfs-scan-node-mt.cc                |   3 +-
 be/src/exec/hdfs-scan-node.cc                   |  15 +-
 be/src/exec/hdfs-scan-node.h                    |   2 +-
 be/src/exec/hdfs-scanner.cc                     |  13 +-
 be/src/exec/hdfs-scanner.h                      |   8 +-
 be/src/exec/hdfs-table-sink.cc                  |   6 +-
 be/src/exec/kudu-scanner.cc                     |  12 +-
 be/src/exec/kudu-scanner.h                      |  11 +-
 be/src/exec/kudu-table-sink.cc                  |   2 +-
 be/src/exec/nested-loop-join-node.cc            |   4 +-
 be/src/exec/partial-sort-node.cc                |  18 +-
 be/src/exec/partial-sort-node.h                 |   4 -
 be/src/exec/partitioned-aggregation-node.cc     | 128 +++++---------
 be/src/exec/partitioned-aggregation-node.h      |  41 ++---
 be/src/exec/partitioned-hash-join-builder.cc    |  30 ++--
 be/src/exec/partitioned-hash-join-builder.h     |   5 +-
 be/src/exec/partitioned-hash-join-node.cc       |  43 ++---
 be/src/exec/partitioned-hash-join-node.h        |   6 +-
 be/src/exec/plan-root-sink.cc                   |   3 +-
 be/src/exec/scan-node.cc                        |   3 +-
 be/src/exec/scanner-context.cc                  |   5 +-
 be/src/exec/scanner-context.h                   |  17 +-
 be/src/exec/sort-node.cc                        |  18 +-
 be/src/exec/sort-node.h                         |   4 -
 be/src/exec/topn-node.cc                        |  11 +-
 be/src/exec/topn-node.h                         |   1 -
 be/src/exec/union-node.cc                       |   6 +-
 be/src/exec/unnest-node.cc                      |   2 +-
 be/src/exprs/agg-fn-evaluator.cc                |  59 ++++---
 be/src/exprs/agg-fn-evaluator.h                 | 121 ++++++++-----
 be/src/exprs/case-expr.cc                       |   4 +-
 be/src/exprs/cast-functions-ir.cc               |   2 +-
 be/src/exprs/expr-test.cc                       |  12 +-
 be/src/exprs/hive-udf-call.cc                   |   4 +-
 be/src/exprs/scalar-expr-evaluator.cc           |  84 ++++-----
 be/src/exprs/scalar-expr-evaluator.h            |  69 ++++----
 be/src/exprs/scalar-fn-call.cc                  |   2 +-
 be/src/exprs/string-functions-ir.cc             |  35 ++--
 be/src/runtime/CMakeLists.txt                   |   1 -
 be/src/runtime/data-stream-sender.cc            |   5 +-
 be/src/runtime/data-stream-test.cc              |   3 +-
 be/src/runtime/descriptors.cc                   |   2 +-
 be/src/runtime/free-pool.cc                     |  28 ---
 be/src/runtime/free-pool.h                      |  20 ---
 be/src/runtime/sorted-run-merger.cc             |   3 -
 be/src/runtime/sorter.cc                        |  68 +++++---
 be/src/runtime/sorter.h                         |  38 +++--
 be/src/runtime/tuple.cc                         |   3 +-
 be/src/service/fe-support.cc                    |   2 +-
 be/src/udf/udf-internal.h                       |  72 ++++----
 be/src/udf/udf-test-harness.cc                  |   3 +-
 be/src/udf/udf.cc                               | 169 ++++++++++---------
 be/src/udf/udf.h                                |  34 ++--
 be/src/util/tuple-row-compare.cc                |  10 +-
 be/src/util/tuple-row-compare.h                 |  16 +-
 .../queries/QueryTest/alloc-fail-init.test      |   6 +-
 tests/custom_cluster/test_alloc_fail.py         |   4 +-
 76 files changed, 753 insertions(+), 839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 093d2b1..f1208f8 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -113,7 +113,8 @@ static Status PrepareSelectList(
   RuntimeState* state = planner->GetRuntimeState();
   ScalarExpr* expr;
   RETURN_IF_ERROR(ScalarExpr::Create(texprs[0], RowDescriptor(), state, &expr));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, eval));
+  RETURN_IF_ERROR(
+      ScalarExprEvaluator::Create(*expr, state, &pool, &mem_pool, &mem_pool, eval));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 1a8b027..5d3879f 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -120,9 +120,9 @@ DEFINE_bool(load_auth_to_local_rules, false, "If true, load auth_to_local config
 
 // Stress options that are only enabled in debug builds for testing.
 #ifndef NDEBUG
-DEFINE_int32(stress_free_pool_alloc, 0, "A stress option which causes memory allocations "
-    "to fail once every n allocations where n is the value of this flag. Effective in "
-    "debug builds only.");
+DEFINE_int32(stress_fn_ctx_alloc, 0, "A stress option which causes memory allocations "
+    "in function contexts to fail once every n allocations where n is the value of this "
+    "flag. Effective in debug builds only.");
 DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes data "
     "stream receiver registration to be delayed. Effective in debug builds only.");
 DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime filtering in "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index a3551f3..bc22d82 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -161,23 +161,20 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
-  fn_pool_.reset(new MemPool(expr_mem_tracker()));
   evaluation_timer_ = ADD_TIMER(runtime_profile(), "EvaluationTime");
 
   DCHECK_EQ(result_tuple_desc_->slots().size(), analytic_fns_.size());
-  RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, fn_pool_.get(),
-      &analytic_fn_evals_));
+  RETURN_IF_ERROR(AggFnEvaluator::Create(analytic_fns_, state, pool_, expr_perm_pool(),
+      expr_results_pool(), &analytic_fn_evals_));
 
   if (partition_by_eq_expr_ != nullptr) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*partition_by_eq_expr_, state, pool_,
-        fn_pool_.get(), &partition_by_eq_expr_eval_));
-    AddEvaluatorToFree(partition_by_eq_expr_eval_);
+        expr_perm_pool(), expr_results_pool(), &partition_by_eq_expr_eval_));
   }
 
   if (order_by_eq_expr_ != nullptr) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*order_by_eq_expr_, state, pool_,
-        fn_pool_.get(), &order_by_eq_expr_eval_));
-    AddEvaluatorToFree(order_by_eq_expr_eval_);
+        expr_perm_pool(), expr_results_pool(), &order_by_eq_expr_eval_));
   }
   return Status::OK();
 }
@@ -382,12 +379,13 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool);
 
   AggFnEvaluator::GetValue(analytic_fn_evals_, curr_tuple_, result_tuple);
-  // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
-  for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
-    if (!slot_desc->type().IsVarLenStringType()) continue;
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        result_tuple->GetSlot(slot_desc->tuple_offset()));
-    if (sv == nullptr || sv->len == 0) continue;
+  // Copy any string data in 'result_tuple' into 'cur_tuple_pool'. The var-len data
+  // returned by GetValue() may be backed by an allocation from
+  // 'expr_results_pool_' that will be recycled so it must be copied out.
+  for (const SlotDescriptor* slot_desc : result_tuple_desc_->string_slots()) {
+    if (result_tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    StringValue* sv = result_tuple->GetStringSlot(slot_desc->tuple_offset());
+    if (sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(
         cur_tuple_pool->TryAllocateUnaligned(sv->len));
     if (UNLIKELY(new_ptr == nullptr)) {
@@ -889,7 +887,6 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
   if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
   if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
   if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
-  if (fn_pool_.get() != nullptr) fn_pool_->FreeAll();
   ExecNode::Close(state);
 }
 
@@ -907,10 +904,4 @@ void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) con
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
-
-Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) {
-  AggFnEvaluator::FreeLocalAllocations(analytic_fn_evals_);
-  return ExecNode::QueryMaintenance(state);
-}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 671eaa4..c7e7873 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -71,9 +71,6 @@ class AnalyticEvalNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  /// Frees local allocations from analytic_fn_evals_
-  virtual Status QueryMaintenance(RuntimeState* state);
-
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
@@ -243,11 +240,6 @@ class AnalyticEvalNode : public ExecNode {
   bool has_first_val_null_offset_;
   long first_val_null_offset_;
 
-  /// Mem pool backing allocations from fn_ctxs_. This pool must not be Reset() because
-  /// the memory is managed by the FreePools of the function contexts which do their own
-  /// bookkeeping using a pointer-based structure stored in the memory blocks themselves.
-  boost::scoped_ptr<MemPool> fn_pool_;
-
   /// Pools used to allocate result tuples (added to result_tuples_ and later returned)
   /// and window tuples (added to window_tuples_ to buffer the current window). Resources
   /// are transferred from curr_tuple_pool_ to prev_tuple_pool_ once it is at least

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index c173ed5..a319860 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -180,9 +180,10 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(
       new MemTracker(-1, Substitute("$0 Exprs", name), mem_tracker_.get(), false));
-  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_exprs_, state, state->obj_pool(),
-      expr_mem_pool(), &output_expr_evals_));
+      expr_perm_pool_.get(), expr_results_pool_.get(), &output_expr_evals_));
   return Status::OK();
 }
 
@@ -195,7 +196,8 @@ void DataSink::Close(RuntimeState* state) {
   if (closed_) return;
   ScalarExprEvaluator::Close(output_expr_evals_, state);
   ScalarExpr::Close(output_exprs_);
-  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
+  if (expr_perm_pool_ != nullptr) expr_perm_pool_->FreeAll();
+  if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
   if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
   if (mem_tracker_ != nullptr) mem_tracker_->Close();
   closed_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 8e870f9..448eb8a 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -99,7 +99,6 @@ class DataSink {
 
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
-  MemPool* expr_mem_pool() const { return expr_mem_pool_.get(); }
   const std::vector<ScalarExprEvaluator*>& output_expr_evals() const {
     return output_expr_evals_;
   }
@@ -122,8 +121,14 @@ class DataSink {
   /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in Prepare().
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
 
-  /// MemPool for backing data structures in expressions and their evaluators.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool for allocations made by expression evaluators in this sink that are
+  /// "permanent" and live until Close() is called.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool for allocations made by expression evaluators in this sink that hold
+  /// intermediate or final results of expression evaluation. Should be cleared
+  /// periodically to free accumulated memory.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// Output expressions to convert row batches onto output values.
   /// Not used in some sub-classes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index a0c002a..b39bcbf 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -109,7 +109,8 @@ Status ExchangeNode::Open(RuntimeState* state) {
   if (is_merging_) {
     // CreateMerger() will populate its merging heap with batches from the stream_recvr_,
     // so it is not necessary to call FillInputRowBatch().
-    RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
+    RETURN_IF_ERROR(
+        less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool()));
     RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
   } else {
     RETURN_IF_ERROR(FillInputRowBatch(state));
@@ -130,11 +131,6 @@ void ExchangeNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status ExchangeNode::QueryMaintenance(RuntimeState* state) {
-  if (less_than_.get() != nullptr) less_than_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 Status ExchangeNode::FillInputRowBatch(RuntimeState* state) {
   DCHECK(!is_merging_);
   Status ret_status;
@@ -208,6 +204,8 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch,
   DCHECK_EQ(output_batch->num_rows(), 0);
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
+  // Clear any expr result allocations made by the merger.
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
 
   while (num_rows_skipped_ < offset_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 4bb6ce3..aaf44c2 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -58,7 +58,6 @@ class ExchangeNode : public ExecNode {
   void set_num_senders(int num_senders) { num_senders_ = num_senders; }
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index b656d2b..aaca8be 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -155,16 +155,16 @@ Status ExecNode::Prepare(RuntimeState* state) {
   mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
       state->instance_mem_tracker()));
   expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
-  expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
+  expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
   rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
   rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
       ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_,
           runtime_profile()->total_time_counter()));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, expr_mem_pool(),
-      &conjunct_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_, expr_perm_pool(),
+      expr_results_pool(), &conjunct_evals_));
   DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
-  AddEvaluatorsToFree(conjunct_evals_);
   for (int i = 0; i < children_.size(); ++i) {
     RETURN_IF_ERROR(children_[i]->Prepare(state));
   }
@@ -206,7 +206,8 @@ void ExecNode::Close(RuntimeState* state) {
 
   ScalarExprEvaluator::Close(conjunct_evals_, state);
   ScalarExpr::Close(conjuncts_);
-  if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
+  if (expr_perm_pool() != nullptr) expr_perm_pool_->FreeAll();
+  if (expr_results_pool() != nullptr) expr_results_pool_->FreeAll();
   if (buffer_pool_client_.is_registered()) {
     VLOG_FILE << id_ << " returning reservation " << resource_profile_.min_reservation;
     state->query_state()->initial_reservations()->Return(
@@ -501,18 +502,10 @@ bool ExecNode::EvalConjuncts(
 }
 
 Status ExecNode::QueryMaintenance(RuntimeState* state) {
-  ScalarExprEvaluator::FreeLocalAllocations(evals_to_free_);
+  expr_results_pool_->Clear();
   return state->CheckQueryState();
 }
 
-void ExecNode::AddEvaluatorToFree(ScalarExprEvaluator* eval) {
-  evals_to_free_.push_back(eval);
-}
-
-void ExecNode::AddEvaluatorsToFree(const vector<ScalarExprEvaluator*>& evals) {
-  for (ScalarExprEvaluator* eval : evals) AddEvaluatorToFree(eval);
-}
-
 void ExecNode::AddCodegenDisabledMessage(RuntimeState* state) {
   if (state->CodegenDisabledByQueryOption()) {
     runtime_profile()->AddCodegenMsg(false, "disabled by query option DISABLE_CODEGEN");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 7cba6ac..a0ed352 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -211,7 +211,8 @@ class ExecNode {
   RuntimeProfile* runtime_profile() { return runtime_profile_; }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
   MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); }
-  MemPool* expr_mem_pool() { return expr_mem_pool_.get(); }
+  MemPool* expr_perm_pool() { return expr_perm_pool_.get(); }
+  MemPool* expr_results_pool() { return expr_results_pool_.get(); }
 
   /// Return true if codegen was disabled by the planner for this ExecNode. Does not
   /// check to see if codegen was enabled for the enclosing fragment.
@@ -318,12 +319,19 @@ class ExecNode {
   /// Account for peak memory used by this node
   boost::scoped_ptr<MemTracker> mem_tracker_;
 
-  /// MemTracker used by 'expr_mem_pool_'.
+  /// MemTracker used by 'expr_perm_pool_' and 'expr_results_pool_'.
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
 
-  /// MemPool for allocating data structures used by expression evaluators in this node.
-  /// Created in Prepare().
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool for allocations made by expression evaluators in this node that are
+  /// "permanent" and live until Close() is called. Created in Prepare().
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool for allocations made by expression evaluators in this node that hold
+  /// intermediate or final results of expression evaluation. Should be cleared
+  /// periodically to free accumulated memory. QueryMaintenance() clears this pool, but
+  /// it may be appropriate for ExecNode implementation to clear it at other points in
+  /// execution where the memory is not needed.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// Buffer pool client for this node. Initialized with the node's minimum reservation
   /// in ClaimBufferReservation(). After initialization, the client must hold onto at
@@ -365,22 +373,11 @@ class ExecNode {
     return ExecDebugActionImpl(phase, state);
   }
 
-  /// Frees any local allocations made by evals_to_free_ and returns the result of
-  /// state->CheckQueryState(). Nodes should call this periodically, e.g. once per input
-  /// row batch. This should not be called outside the main execution thread.
-  //
-  /// Nodes may override this to add extra periodic cleanup, e.g. freeing other local
-  /// allocations. ExecNodes overriding this function should return
-  /// ExecNode::QueryMaintenance().
-  virtual Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  /// Add an expr evaluator to have its local allocations freed by QueryMaintenance().
-  /// Exprs that are evaluated in the main execution thread should be added. Exprs
-  /// evaluated in a separate thread are generally not safe to add, since a local
-  /// allocation may be freed while it's being used. Rather than using this mechanism,
-  /// threads should call FreeLocalAllocations() on local evaluators periodically.
-  void AddEvaluatorToFree(ScalarExprEvaluator* eval);
-  void AddEvaluatorsToFree(const std::vector<ScalarExprEvaluator*>& evals);
+  /// Clears 'expr_results_pool_' and returns the result of state->CheckQueryState().
+  /// Nodes should call this periodically, e.g. once per input row batch. This should
+  /// not be called outside the main execution thread.
+  /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
+  Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
 
  private:
   /// Implementation of ExecDebugAction(). This is the slow path we take when there is
@@ -391,10 +388,6 @@ class ExecNode {
   /// Set in ExecNode::Close(). Used to make Close() idempotent. This is not protected
   /// by a lock, it assumes all calls to Close() are made by the same thread.
   bool is_closed_;
-
-  /// Expr evaluators whose local allocations are safe to free in the main execution
-  /// thread.
-  std::vector<ScalarExprEvaluator*> evals_to_free_;
 };
 
 inline bool ExecNode::EvalPredicate(ScalarExprEvaluator* eval, TupleRow* row) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index ecf744a..a882365 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -68,10 +68,11 @@ void FilterStats::RegisterCounterGroup(const string& key) {
 }
 
 Status FilterContext::CloneFrom(const FilterContext& from, ObjectPool* pool,
-    RuntimeState* state, MemPool* mem_pool) {
+    RuntimeState* state, MemPool* expr_perm_pool, MemPool* expr_results_pool) {
   filter = from.filter;
   stats = from.stats;
-  return from.expr_eval->Clone(pool, state, mem_pool, &expr_eval);
+  return from.expr_eval->Clone(
+      pool, state, expr_perm_pool, expr_results_pool, &expr_eval);
 }
 
 bool FilterContext::Eval(TupleRow* row) const noexcept {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index 81a3889..5c232f8 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -100,7 +100,7 @@ struct FilterContext {
   /// Clones this FilterContext for use in a multi-threaded context (i.e. by scanner
   /// threads).
   Status CloneFrom(const FilterContext& from, ObjectPool* pool, RuntimeState* state,
-      MemPool* mem_pool);
+      MemPool* expr_perm_pool, MemPool* expr_results_pool);
 
   /// Evaluates 'row' with 'expr_eval' with the resulting value being checked
   /// against runtime filter 'filter' for matches. Returns true if 'row' finds

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 0b99cbd..c7dbfe2 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -84,14 +84,14 @@ class HashTableTest : public testing::Test {
     ASSERT_OK(build_expr->Init(desc, nullptr));
     build_exprs_.push_back(build_expr);
     ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_,
-        &build_expr_evals_));
+        &mem_pool_, &build_expr_evals_));
     ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
 
     ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 1, true /* nullable */));
     ASSERT_OK(probe_expr->Init(desc, nullptr));
     probe_exprs_.push_back(probe_expr);
     ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
-        &probe_expr_evals_));
+        &mem_pool_, &probe_expr_evals_));
     ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
 
     CreateTestEnv();
@@ -304,7 +304,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_,
         build_exprs_, probe_exprs_, true /* stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx));
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
+        &mem_pool_, &ht_ctx));
     EXPECT_OK(ht_ctx->Open(runtime_state_));
 
     for (int i = 0; i < 2; ++i) {
@@ -342,7 +343,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     EXPECT_OK(ht_ctx->Open(runtime_state_));
     bool success;
@@ -402,7 +404,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     EXPECT_OK(ht_ctx->Open(runtime_state_));
 
@@ -466,7 +469,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &mem_pool_,
+        &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
 
     // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20)
@@ -553,7 +557,8 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */,
-        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
+        vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_,
+        &mem_pool_, &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
 
     // Insert and probe table_size different tuples. All of them are expected to be
@@ -626,7 +631,7 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTableCtx> ht_ctx;
     Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
         probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1,
-        &mem_pool_, &ht_ctx);
+        &mem_pool_, &mem_pool_, &mem_pool_, &ht_ctx);
     EXPECT_OK(status);
     HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
@@ -712,7 +717,8 @@ TEST_F(HashTableTest, HashEmpty) {
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
       probe_exprs_, false /* !stores_nulls_ */,
-      vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, &ht_ctx);
+      vector<bool>(build_exprs_.size(), false), 1, 2, 1, &mem_pool_, &mem_pool_,
+      &mem_pool_, &ht_ctx);
   EXPECT_OK(status);
   EXPECT_OK(ht_ctx->Open(runtime_state_));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index c9090b9..a1d9c57 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -97,7 +97,8 @@ static_assert(sizeof(NULL_VALUE) >= ColumnType::MAX_CHAR_LENGTH,
 HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed,
-    int max_levels, MemPool* mem_pool)
+    int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+    MemPool* probe_expr_results_pool)
     : build_exprs_(build_exprs),
       probe_exprs_(probe_exprs),
       stores_nulls_(stores_nulls),
@@ -106,7 +107,9 @@ HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
           finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())),
       level_(0),
       scratch_row_(NULL),
-      mem_pool_(mem_pool) {
+      expr_perm_pool_(expr_perm_pool),
+      build_expr_results_pool_(build_expr_results_pool),
+      probe_expr_results_pool_(probe_expr_results_pool) {
   DCHECK(!finds_some_nulls_ || stores_nulls_);
   // Compute the layout and buffer size to store the evaluated expr results
   DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
@@ -131,22 +134,24 @@ Status HashTableCtx::Init(ObjectPool* pool, RuntimeState* state, int num_build_t
     return Status(Substitute("Failed to allocate $0 bytes for scratch row of "
         "HashTableCtx.", scratch_row_size));
   }
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, mem_pool_,
-      &build_expr_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, expr_perm_pool_,
+      build_expr_results_pool_, &build_expr_evals_));
   DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, mem_pool_,
-      &probe_expr_evals_));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, expr_perm_pool_,
+      probe_expr_results_pool_, &probe_expr_evals_));
   DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
-  return expr_values_cache_.Init(state, mem_pool_->mem_tracker(), build_exprs_);
+  return expr_values_cache_.Init(state, expr_perm_pool_->mem_tracker(), build_exprs_);
 }
 
 Status HashTableCtx::Create(ObjectPool* pool, RuntimeState* state,
     const std::vector<ScalarExpr*>& build_exprs,
     const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
     const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
-    int num_build_tuples, MemPool* mem_pool, scoped_ptr<HashTableCtx>* ht_ctx) {
+    int num_build_tuples, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+    MemPool* probe_expr_results_pool, scoped_ptr<HashTableCtx>* ht_ctx) {
   ht_ctx->reset(new HashTableCtx(build_exprs, probe_exprs, stores_nulls,
-      finds_nulls, initial_seed, max_levels, mem_pool));
+      finds_nulls, initial_seed, max_levels, expr_perm_pool,
+      build_expr_results_pool, probe_expr_results_pool));
   return (*ht_ctx)->Init(pool, state, num_build_tuples);
 }
 
@@ -159,24 +164,11 @@ Status HashTableCtx::Open(RuntimeState* state) {
 void HashTableCtx::Close(RuntimeState* state) {
   free(scratch_row_);
   scratch_row_ = NULL;
-  expr_values_cache_.Close(mem_pool_->mem_tracker());
+  expr_values_cache_.Close(expr_perm_pool_->mem_tracker());
   ScalarExprEvaluator::Close(build_expr_evals_, state);
   ScalarExprEvaluator::Close(probe_expr_evals_, state);
 }
 
-void HashTableCtx::FreeBuildLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_);
-}
-
-void HashTableCtx::FreeProbeLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_);
-}
-
-void HashTableCtx::FreeLocalAllocations() {
-  FreeBuildLocalAllocations();
-  FreeProbeLocalAllocations();
-}
-
 uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
   /// Use CRC hash at first level for better performance. Switch to murmur hash at
   /// subsequent levels since CRC doesn't randomize well with different seed inputs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index d764640..91fb968 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -118,7 +118,8 @@ class HashTableCtx {
       const std::vector<ScalarExpr*>& build_exprs,
       const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
-      int num_build_tuples, MemPool* mem_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx);
+      int num_build_tuples, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+      MemPool* probe_expr_results_pool, boost::scoped_ptr<HashTableCtx>* ht_ctx);
 
   /// Initialize the build and probe expression evaluators.
   Status Open(RuntimeState* state);
@@ -126,13 +127,6 @@ class HashTableCtx {
   /// Call to cleanup any resources allocated by the expression evaluators.
   void Close(RuntimeState* state);
 
-  /// Free local allocations made by build and probe expression evaluators respectively.
-  void FreeBuildLocalAllocations();
-  void FreeProbeLocalAllocations();
-
-  /// Free local allocations of both build and probe expression evaluators.
-  void FreeLocalAllocations();
-
   void set_level(int level);
 
   int ALWAYS_INLINE level() const { return level_; }
@@ -398,9 +392,18 @@ class HashTableCtx {
   ///  - initial_seed: initial seed value to use when computing hashes for rows with
   ///        level 0. Other levels have their seeds derived from this seed.
   ///  - max_levels: the max lhashevels we will hash with.
-  ///  - mem_pool: the MemPool which the expression evaluators allocate from. Owned by the
-  ///        exec node which owns this hash table context. Memory usage of the expression
-  ///        value cache is charged against its MemTracker.
+  ///  - expr_perm_pool: the MemPool from which the expression evaluators make permanent
+  ///        allocations that live until Close(). Owned by the exec node which owns this
+  ///        hash table context. Memory usage of the expression value cache is charged
+  ///        against this MemPool's tracker.
+  ///  - build_expr_results_pool: the MemPool from which the expression evaluators make
+  ///        allocations to hold expression results. Cached build expression values may
+  ///        reference memory in this pool. Owned by the exec node which owns this hash
+  ///        table context.
+  ///  - probe_expr_results_pool: the MemPool from which the expression evaluators make
+  ///        allocations to hold expression results. Cached probe expression values may
+  ///        reference memory in this pool. Owned by the exec node which owns this hash
+  ///        table context.
   ///
   /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined
   ///       with '<=>' and others with '=', stores_nulls could distinguish between columns
@@ -409,7 +412,8 @@ class HashTableCtx {
   HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
       const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed,
-      int max_levels, MemPool* mem_pool);
+      int max_levels, MemPool* expr_perm_pool, MemPool* build_expr_results_pool,
+      MemPool* probe_expr_results_pool);
 
   /// Allocate various buffers for storing expression evaluation results, hash values,
   /// null bits etc. Also allocate evaluators for the build and probe expressions and
@@ -513,9 +517,15 @@ class HashTableCtx {
   /// Scratch buffer to generate rows on the fly.
   TupleRow* scratch_row_;
 
-  /// MemPool for 'build_expr_evals_' and 'probe_expr_evals_' to allocate from.
-  /// Not owned.
-  MemPool* mem_pool_;
+  /// MemPool for 'build_expr_evals_' and 'probe_expr_evals_' to allocate expr-managed
+  /// memory from. Not owned.
+  MemPool* expr_perm_pool_;
+
+  /// MemPools for allocations by 'build_expr_evals_' and 'probe_expr_evals_' that hold
+  /// results of expr evaluation. Not owned. The owner of these pools is responsible for
+  /// clearing them when results from the respective expr evaluators are no longer needed.
+  MemPool* build_expr_results_pool_;
+  MemPool* probe_expr_results_pool_;
 };
 
 /// The hash table consists of a contiguous array of buckets that contain a pointer to the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index c957793..f393d8d 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -69,7 +69,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_track
 
 Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 57f1e24..6cfaee0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -217,8 +217,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Clone the min/max statistics conjuncts.
   RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
-      expr_mem_pool_.get(), scan_node_->min_max_conjunct_evals(),
-      &min_max_conjunct_evals_));
+      expr_perm_pool_.get(), context_->expr_results_pool(),
+      scan_node_->min_max_conjunct_evals(), &min_max_conjunct_evals_));
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
@@ -605,8 +605,8 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     }
   }
 
-  // Free any local allocations accumulated during conjunct evaluation.
-  ScalarExprEvaluator::FreeLocalAllocations(min_max_conjunct_evals_);
+  // Free any expr result allocations accumulated during conjunct evaluation.
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 
@@ -918,9 +918,9 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
       if (dict_idx % 1024 == 0) {
-        // Don't let local allocations accumulate too much for large dictionaries or
+        // Don't let expr result allocations accumulate too much for large dictionaries or
         // many row groups.
-        ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
+        context_->expr_results_pool()->Clear();
       }
       dictionary->GetValue(dict_idx, slot);
 
@@ -934,8 +934,8 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
         break;
       }
     }
-    // Free all local allocations now that we're done with the filter.
-    ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
+    // Free all expr result allocations now that we're done with the filter.
+    context_->expr_results_pool()->Clear();
 
     if (!column_has_match) {
       // The column contains no value that matches the conjunct. The row group
@@ -1042,20 +1042,12 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   // Store UDF error in thread local storage or make UDF return status so it can merge
   // with parse_status_.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations made when evaluating conjuncts for this batch.
-  FreeLocalAllocationsForConjuncts();
+  // Clear expr result allocations for this thread to avoid accumulating too much
+  // memory from evaluating the scanner conjuncts.
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 
-void HdfsParquetScanner::FreeLocalAllocationsForConjuncts() {
-  for (const auto& kv: conjunct_evals_map_) {
-    ScalarExprEvaluator::FreeLocalAllocations(kv.second);
-  }
-  for (const FilterContext* filter_ctx : filter_ctxs_) {
-    filter_ctx->expr_eval->FreeLocalAllocations();
-  }
-}
-
 int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   // This function must not be called when the output batch is already full. As long as
   // we always call CommitRows() after TransferScratchTuples(), the output batch can

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 754737d..5f67036 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -650,11 +650,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// no values that pass the relevant conjuncts, then the row group can be skipped.
   Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
       bool* skip_row_group) WARN_UNUSED_RESULT;
-
-  /// Free local allocations made when evaluating conjuncts over each row. Does not free
-  /// local allocations made when evaluated conjuncts for row groups, pages, etc. Those
-  /// should be freed separately after they are evaluated.
-  void FreeLocalAllocationsForConjuncts();
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 5582ead..59d4ca2 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -142,14 +142,14 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     } else {
       DCHECK(conjunct_evals_map_[entry.first].empty());
       RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_,
-          expr_mem_pool(), &conjunct_evals_map_[entry.first]));
+          expr_perm_pool(), expr_results_pool(), &conjunct_evals_map_[entry.first]));
     }
   }
 
   // Prepare min max statistics conjuncts.
   if (min_max_tuple_id_ != -1) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_,
-        expr_mem_pool(), &min_max_conjunct_evals_));
+        expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_));
   }
 
   // One-time initialization of state that is constant across scan ranges

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 53aa026..8d4efec 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -88,7 +88,8 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     int64_t partition_id = metadata->partition_id;
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
     scanner_ctx_.reset(new ScannerContext(
-        runtime_state_, this, partition, scan_range_, filter_ctxs()));
+        runtime_state_, this, partition, scan_range_, filter_ctxs(),
+        expr_results_pool()));
     Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
     if (!status.ok()) {
       DCHECK(scanner_ == NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 64eece3..6aeb6a9 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -386,16 +386,20 @@ void HdfsScanNode::ScannerThread() {
   // contexts as the embedded expression evaluators may allocate from it and MemPool
   // is not thread safe.
   MemPool filter_mem_pool(expr_mem_tracker());
+  MemPool expr_results_pool(expr_mem_tracker());
   vector<FilterContext> filter_ctxs;
   Status filter_status = Status::OK();
   for (auto& filter_ctx: filter_ctxs_) {
     FilterContext filter;
-    filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, &filter_mem_pool);
+    filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, &filter_mem_pool,
+        &expr_results_pool);
     if (!filter_status.ok()) break;
     filter_ctxs.push_back(filter);
   }
 
   while (!done_) {
+    // Prevent memory accumulating across scan ranges.
+    expr_results_pool.Clear();
     {
       // Check if we have enough resources (thread token and memory) to keep using
       // this thread.
@@ -435,7 +439,7 @@ void HdfsScanNode::ScannerThread() {
     if (status.ok() && scan_range != NULL) {
       // Got a scan range. Process the range end to end (in this thread).
       status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-          scan_range);
+          &expr_results_pool, scan_range);
     }
 
     if (!status.ok()) {
@@ -477,6 +481,7 @@ exit:
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
+  expr_results_pool.FreeAll();
 }
 
 namespace {
@@ -492,8 +497,7 @@ bool FileFormatIsSequenceBased(THdfsFileFormat::type format) {
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
-    DiskIoMgr::ScanRange* scan_range) {
-
+    MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) {
   DCHECK(scan_range != NULL);
 
   ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -519,7 +523,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     }
   }
 
-  ScannerContext context(runtime_state_, this, partition, scan_range, filter_ctxs);
+  ScannerContext context(
+      runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool);
   scoped_ptr<HdfsScanner> scanner;
   Status status = CreateAndOpenScanner(partition, &context, &scanner);
   if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 782f530..30435c2 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -166,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split.
   Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
-      DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
+      MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT;
 
   /// Returns true if there is enough memory (against the mem tracker limits) to
   /// have a scanner thread.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e81a72a..e0f3016 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -60,7 +60,7 @@ const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
 HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
       state_(state),
-      expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
+      expr_perm_pool_(new MemPool(scan_node->expr_mem_tracker())),
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
       data_buffer_pool_(new MemPool(scan_node->mem_tracker())) {
@@ -84,7 +84,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
   // caller.
   for (const auto& entry: scan_node_->conjuncts_map()) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, scan_node_->runtime_state(),
-        expr_mem_pool_.get(), entry.second,
+        expr_perm_pool_.get(), context_->expr_results_pool(), entry.second,
         &conjunct_evals_map_[entry.first]));
   }
   DCHECK(conjunct_evals_map_.find(scan_node_->tuple_desc()->id()) !=
@@ -145,7 +145,8 @@ void HdfsScanner::CloseInternal() {
   for (auto& entry : conjunct_evals_map_) {
     ScalarExprEvaluator::Close(entry.second, state_);
   }
-  expr_mem_pool_->FreeAll();
+  expr_perm_pool_->FreeAll();
+  context_->expr_results_pool()->FreeAll();
   obj_pool_.Clear();
   stream_ = nullptr;
   context_->ClearStreams();
@@ -199,11 +200,9 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
   RETURN_IF_ERROR(state_->GetQueryStatus());
-  // Free local expr allocations for this thread to avoid accumulating too much
+  // Clear expr result allocations for this thread to avoid accumulating too much
   // memory from evaluating the scanner conjuncts.
-  for (const auto& entry: conjunct_evals_map_) {
-    ScalarExprEvaluator::FreeLocalAllocations(entry.second);
-  }
+  context_->expr_results_pool()->Clear();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index f2f3407..2005870 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -205,9 +205,9 @@ class HdfsScanner {
   /// Starts as false and is set to true in Close().
   bool is_closed_ = false;
 
-  /// MemPool used for expression evaluators in this scanner. Need to be local
-  /// to each scanner as MemPool is not thread safe.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool used for expr-managed memory in expression evaluators in this scanner.
+  /// Need to be local to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
 
   /// Clones of the conjuncts' evaluators in scan_node_->conjuncts_map().
   /// Each scanner has its own ScalarExprEvaluators so the conjuncts can be safely
@@ -311,7 +311,7 @@ class HdfsScanner {
 
   /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly.
   /// Attaches completed resources from 'context_' to 'row_batch' if necessary.
-  /// Frees local expr allocations. Returns non-OK if 'context_' is cancelled or the
+  /// Frees expr result allocations. Returns non-OK if 'context_' is cancelled or the
   /// query status in 'state_' is non-OK.
   Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index e488717..e96ceb7 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -94,7 +94,8 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   SCOPED_TIMER(profile()->total_time_counter());
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
-      state->obj_pool(), expr_mem_pool(), &partition_key_expr_evals_));
+      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
+      &partition_key_expr_evals_));
 
   // TODO: Consider a system-wide random number generator, initialised in a single place.
   ptime now = microsec_clock::local_time();
@@ -599,8 +600,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
 
 Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
-  ScalarExprEvaluator::FreeLocalAllocations(partition_key_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   // We don't do any work for an empty batch.
   if (batch->num_rows() == 0) return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 9c9ce73..deb9c84 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -63,7 +63,8 @@ const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
-    expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
+    expr_perm_pool_(new MemPool(scan_node->expr_mem_tracker())),
+    expr_results_pool_(new MemPool(scan_node->expr_mem_tracker())),
     cur_kudu_batch_num_read_(0),
     last_alive_time_micros_(0) {
 }
@@ -74,8 +75,8 @@ Status KuduScanner::Open() {
     if (slot->type().type != TYPE_TIMESTAMP) continue;
     timestamp_slots_.push_back(slot);
   }
-  return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_mem_pool_.get(),
-      scan_node_->conjunct_evals(), &conjunct_evals_);
+  return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
+      expr_results_pool_.get(), scan_node_->conjunct_evals(), &conjunct_evals_);
 }
 
 void KuduScanner::KeepKuduScannerAlive() {
@@ -131,7 +132,8 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
 void KuduScanner::Close() {
   if (scanner_) CloseCurrentClientScanner();
   ScalarExprEvaluator::Close(conjunct_evals_, state_);
-  expr_mem_pool_->FreeAll();
+  expr_perm_pool_->FreeAll();
+  expr_results_pool_->FreeAll();
 }
 
 Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
@@ -245,7 +247,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
     // Move to the next tuple in the tuple buffer.
     *tuple_mem = next_tuple(*tuple_mem);
   }
-  ScalarExprEvaluator::FreeLocalAllocations(conjunct_evals_);
+  expr_results_pool_->Clear();
 
   // Check the status in case an error status was set during conjunct evaluation.
   return state_->GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index e327e7a..125881c 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -90,9 +90,14 @@ class KuduScanner {
   /// For objects which have the same life time as the scanner.
   ObjectPool obj_pool_;
 
-  /// MemPool used for expression evaluators in this scanner. Need to be local
-  /// to each scanner as MemPool is not thread safe.
-  boost::scoped_ptr<MemPool> expr_mem_pool_;
+  /// MemPool used for expr-managed allocations in expression evaluators in this scanner.
+  /// Need to be local to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_perm_pool_;
+
+  /// MemPool used for allocations by expression evaluators in this scanner that hold
+  /// results of expression evaluation. Need to be local to each scanner as MemPool is
+  /// not thread safe.
+  boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// The kudu::client::KuduScanner for the current scan token. A new KuduScanner is
   /// created for each scan token using KuduScanToken::DeserializeIntoScanner().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index f01a373..c5abf7b 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -213,7 +213,7 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() {
 
 Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
+  expr_results_pool_->Clear();
   RETURN_IF_ERROR(state->CheckQueryState());
   const KuduSchema& table_schema = table_->schema();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index e80d230..c5ecf53 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -66,7 +66,7 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Open(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(join_conjunct_evals_, state));
 
-  // Check for errors and free local allocations before opening children.
+  // Check for errors and free expr result allocations before opening children.
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
@@ -99,7 +99,7 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
-      pool_, expr_mem_pool(), &join_conjunct_evals_));
+      pool_, expr_perm_pool(), expr_results_pool(), &join_conjunct_evals_));
   builder_.reset(new NljBuilder(child(1)->row_desc(), state));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 107c29f..68d98a4 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -57,11 +57,10 @@ Status PartialSortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status PartialSortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
-  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
-      mem_tracker(), &buffer_pool_client_, resource_profile_.spillable_buffer_size,
-      runtime_profile(), state, id(), false));
-  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
+      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), &buffer_pool_client_,
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), false));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   AddCodegenDisabledMessage(state);
   input_batch_.reset(
@@ -73,14 +72,13 @@ void PartialSortNode::Codegen(RuntimeState* state) {
   DCHECK(state->ShouldCodegen());
   ExecNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  Status codegen_status = less_than_->Codegen(state);
+  Status codegen_status = sorter_->Codegen(state);
   runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
 }
 
 Status PartialSortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   RETURN_IF_ERROR(child(0)->Open(state));
@@ -151,7 +149,6 @@ Status PartialSortNode::Reset(RuntimeState* state) {
 void PartialSortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   child(0)->Close(state);
-  if (less_than_.get() != nullptr) less_than_->Close(state);
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
   ScalarExpr::Close(ordering_exprs_);
@@ -160,11 +157,6 @@ void PartialSortNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-Status PartialSortNode::QueryMaintenance(RuntimeState* state) {
-  sorter_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 void PartialSortNode::DebugString(int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
   *out << "PartialSortNode(" << ScalarExpr::DebugString(ordering_exprs_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index d40d653..421df31 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -54,13 +54,9 @@ class PartialSortNode : public ExecNode {
   virtual void Close(RuntimeState* state);
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
  private:
-  /// Compares tuples according to 'ordering_exprs'.
-  boost::scoped_ptr<TupleRowComparator> less_than_;
-
   /// Expressions and parameters used for tuple comparison.
   std::vector<ScalarExpr*> ordering_exprs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 3824a72..4eaebb6 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -186,8 +186,7 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   state_ = state;
 
-  mem_pool_.reset(new MemPool(mem_tracker()));
-  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
+  singleton_tuple_pool_.reset(new MemPool(mem_tracker()));
 
   ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
   get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
@@ -218,13 +217,14 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
         "MaxPartitionLevel", TUnit::UNIT);
   }
 
-  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, agg_fn_pool_.get(),
-      &agg_fn_evals_));
+  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool(),
+      expr_results_pool(), &agg_fn_evals_));
 
   if (!grouping_exprs_.empty()) {
     RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_,
         grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
-        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), &ht_ctx_));
+        state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_perm_pool(),
+        expr_results_pool(), expr_results_pool(), &ht_ctx_));
   }
   AddCodegenDisabledMessage(state);
   return Status::OK();
@@ -263,7 +263,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
     // Create the single output tuple for this non-grouping agg. This must happen after
     // opening the aggregate evaluators.
     singleton_output_tuple_ =
-        ConstructSingletonOutputTuple(agg_fn_evals_, mem_pool_.get());
+        ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get());
     // Check for failures during AggFnEvaluator::Init().
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
@@ -343,62 +343,8 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch,
-    bool* eos) {
-  int first_row_idx = row_batch->num_rows();
-  RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
-  RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
-    int first_row_idx) {
-  if (!needs_finalize_ && !needs_serialize_) return Status::OK();
-  // String data returned by Serialize() or Finalize() is from local expr allocations in
-  // the agg function contexts, and will be freed on the next GetNext() call by
-  // FreeLocalAllocations(). The data either needs to be copied out now or sent up the
-  // plan and copied out by a blocking ancestor. (See IMPALA-3311)
-  for (const AggFn* agg_fn : agg_fns_) {
-    const SlotDescriptor& slot_desc = agg_fn->output_slot_desc();
-    DCHECK(!slot_desc.type().IsCollectionType()) << "producing collections NYI";
-    if (!slot_desc.type().IsVarLenStringType()) continue;
-    if (IsInSubplan()) {
-      // Copy string data to the row batch's pool. This is more efficient than
-      // MarkNeedsDeepCopy() in a subplan since we are likely producing many small
-      // batches.
-      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch,
-          first_row_idx, row_batch->tuple_data_pool()));
-    } else {
-      row_batch->MarkNeedsDeepCopy();
-      break;
-    }
-  }
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& slot_desc,
-    RowBatch* row_batch, int first_row_idx, MemPool* pool) {
-  DCHECK(slot_desc.type().IsVarLenStringType());
-  DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
-  FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
-    Tuple* tuple = batch_iter.Get()->GetTuple(0);
-    StringValue* sv = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(slot_desc.tuple_offset()));
-    if (sv == NULL || sv->len == 0) continue;
-    char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
-    if (UNLIKELY(new_ptr == NULL)) {
-      string details = Substitute("Cannot perform aggregation at node with id $0."
-          " Failed to allocate $1 output bytes.", id_, sv->len);
-      return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len);
-    }
-    memcpy(new_ptr, sv->ptr, sv->len);
-    sv->ptr = new_ptr;
-  }
-  return Status::OK();
-}
-
-Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
-    RowBatch* row_batch, bool* eos) {
+Status PartitionedAggregationNode::GetNext(
+    RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
@@ -435,6 +381,11 @@ void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   DCHECK(grouping_exprs_.empty());
   int row_idx = row_batch->AddRow();
   TupleRow* row = row_batch->GetRow(row_idx);
+  // The output row batch may reference memory allocated by Serialize() or Finalize(),
+  // allocating that memory directly from the row batch's pool means we can safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool =
+      ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
   Tuple* output_tuple = GetOutputTuple(agg_fn_evals_,
       singleton_output_tuple_, row_batch->tuple_data_pool());
   row->SetTuple(0, output_tuple);
@@ -446,7 +397,7 @@ void PartitionedAggregationNode::GetSingletonOutput(RowBatch* row_batch) {
   }
   // Keep the current chunk to amortize the memory allocation over a series
   // of Reset()/Open()/GetNext()* calls.
-  row_batch->tuple_data_pool()->AcquireData(mem_pool_.get(), true);
+  row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
   // This node no longer owns the memory for singleton_output_tuple_.
   singleton_output_tuple_ = NULL;
 }
@@ -471,10 +422,16 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
   }
 
   SCOPED_TIMER(get_results_timer_);
+
+  // The output row batch may reference memory allocated by Serialize() or Finalize(),
+  // allocating that memory directly from the row batch's pool means we can safely return
+  // the batch.
+  vector<ScopedResultsPool> allocate_from_batch_pool = ScopedResultsPool::Create(
+        output_partition_->agg_fn_evals, row_batch->tuple_data_pool());
   int count = 0;
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   // Keeping returning rows from the current partition.
-  while (!output_iterator_.AtEnd()) {
+  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
     // This loop can go on for a long time if the conjuncts are very selective. Do query
     // maintenance every N iterations.
     if ((count++ & (N - 1)) == 0) {
@@ -493,9 +450,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
     if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) {
       row_batch->CommitLastRow();
       ++num_rows_returned_;
-      if (ReachedLimit() || row_batch->AtCapacity()) {
-        break;
-      }
+      if (ReachedLimit()) break;
     }
   }
 
@@ -634,17 +589,22 @@ void PartitionedAggregationNode::CleanupHashTbl(
     // Finalize() requires a dst tuple but we don't actually need the result,
     // so allocate a single dummy tuple to avoid accumulating memory.
     Tuple* dummy_dst = NULL;
-    dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), mem_pool_.get());
+    dummy_dst = Tuple::Create(
+        output_tuple_desc_->byte_size(), singleton_tuple_pool_.get());
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       AggFnEvaluator::Finalize(agg_fn_evals, tuple, dummy_dst);
       it.Next();
+      // Free any expr result allocations to prevent them accumulating excessively.
+      expr_results_pool_->Clear();
     }
   } else {
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       AggFnEvaluator::Serialize(agg_fn_evals, tuple);
       it.Next();
+      // Free any expr result allocations to prevent them accumulating excessively.
+      expr_results_pool_->Clear();
     }
   }
 }
@@ -665,7 +625,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) {
   if (is_closed()) return;
 
   if (!singleton_output_tuple_returned_) {
-    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, mem_pool_.get());
+    GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get());
   }
 
   // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
@@ -682,8 +642,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) {
   // Close all the agg-fn-evaluators
   AggFnEvaluator::Close(agg_fn_evals_, state);
 
-  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll();
-  if (mem_pool_.get() != nullptr) mem_pool_->FreeAll();
+  if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
   if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
   if (serialize_stream_.get() != nullptr) {
@@ -700,10 +659,10 @@ PartitionedAggregationNode::Partition::~Partition() {
 }
 
 Status PartitionedAggregationNode::Partition::InitStreams() {
-  agg_fn_pool.reset(new MemPool(parent->expr_mem_tracker()));
+  agg_fn_perm_pool.reset(new MemPool(parent->expr_mem_tracker()));
   DCHECK_EQ(agg_fn_evals.size(), 0);
-  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_pool.get(),
-      parent->agg_fn_evals_, &agg_fn_evals);
+  AggFnEvaluator::ShallowClone(parent->partition_pool_.get(), agg_fn_perm_pool.get(),
+      parent->expr_results_pool(), parent->agg_fn_evals_, &agg_fn_evals);
   // Varlen aggregate function results are stored outside of aggregated_row_stream because
   // BufferedTupleStream doesn't support relocating varlen data stored in the stream.
   auto agg_slot = parent->intermediate_tuple_desc_->slots().begin() +
@@ -829,9 +788,9 @@ Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
   AggFnEvaluator::Close(agg_fn_evals, parent->state_);
   agg_fn_evals.clear();
 
-  if (agg_fn_pool.get() != NULL) {
-    agg_fn_pool->FreeAll();
-    agg_fn_pool.reset();
+  if (agg_fn_perm_pool.get() != nullptr) {
+    agg_fn_perm_pool->FreeAll();
+    agg_fn_perm_pool.reset();
   }
 
   hash_tbl->Close();
@@ -875,7 +834,7 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
     unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
-  if (agg_fn_pool.get() != NULL) agg_fn_pool->FreeAll();
+  if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
 }
 
 Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple(
@@ -1346,7 +1305,7 @@ Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
     // Pass 'true' because we need to keep the write block pinned. See Partition::Spill().
     int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
     mem += hash_partitions_[i]->hash_tbl->ByteSize();
-    mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
+    mem += hash_partitions_[i]->agg_fn_perm_pool->total_reserved_bytes();
     DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
     if (mem > max_freed_mem) {
       max_freed_mem = mem;
@@ -1432,17 +1391,6 @@ void PartitionedAggregationNode::ClosePartitions() {
   partition_pool_->Clear();
 }
 
-Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
-  AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
-  for (Partition* partition : hash_partitions_) {
-    if (partition != nullptr) {
-      AggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals);
-    }
-  }
-  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
 // IR Generation for updating a single aggregation slot. Signature is:
 // void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
 //

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 72354cc..c522c1b 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -144,8 +144,6 @@ class PartitionedAggregationNode : public ExecNode {
   static const char* LLVM_CLASS_NAME;
 
  protected:
-  /// Frees local allocations from aggregate_evals_ and agg_fn_evals
-  virtual Status QueryMaintenance(RuntimeState* state);
   virtual std::string DebugString(int indentation_level) const;
   virtual void DebugString(int indentation_level, std::stringstream* out) const;
 
@@ -213,13 +211,14 @@ class PartitionedAggregationNode : public ExecNode {
   /// The list of all aggregate operations for this exec node.
   std::vector<AggFn*> agg_fns_;
 
-  /// Evaluators for each aggregate function and backing MemPool. String data
-  /// returned by the aggregate functions is allocated via these evaluators.
-  /// These evaluatorss are only used for the non-grouping cases. For queries
-  /// with the group-by clause, each partition will clone these evaluators.
-  /// TODO: we really need to plumb through CHAR(N) for intermediate types.
+  /// Evaluators for each aggregate function. If this is a grouping aggregation, these
+  /// evaluators are only used to create cloned per-partition evaluators. The cloned
+  /// evaluators are then used to evaluate the functions. If this is a non-grouping
+  /// aggregation these evaluators are used directly to evaluate the functions.
+  ///
+  /// Permanent and result allocations for these allocators are allocated from
+  /// 'expr_perm_pool_' and 'expr_results_pool_' respectively.
   std::vector<AggFnEvaluator*> agg_fn_evals_;
-  boost::scoped_ptr<MemPool> agg_fn_pool_;
 
   /// Exprs used to evaluate input rows
   std::vector<ScalarExpr*> grouping_exprs_;
@@ -243,7 +242,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// For non-grouping aggregations, the ownership of the pool's memory is transferred
   /// to the output batch on eos. The pool should not be Reset() to allow amortizing
   /// memory allocation over a series of Reset()/Open()/GetNext()* calls.
-  boost::scoped_ptr<MemPool> mem_pool_;
+  boost::scoped_ptr<MemPool> singleton_tuple_pool_;
 
   /// The current partition and iterator to the next row in its hash table that we need
   /// to return in GetNext()
@@ -415,9 +414,15 @@ class PartitionedAggregationNode : public ExecNode {
     /// is spilled or we are passing through all rows for this partition).
     boost::scoped_ptr<HashTable> hash_tbl;
 
-    /// Clone of parent's agg_fn_evals_ and backing MemPool.
+    /// Clone of parent's agg_fn_evals_. Permanent allocations come from
+    /// 'agg_fn_perm_pool' and result allocations come from the ExecNode's
+    /// 'expr_results_pool_'.
     std::vector<AggFnEvaluator*> agg_fn_evals;
-    boost::scoped_ptr<MemPool> agg_fn_pool;
+
+    /// Pool for permanent allocations for this partition's 'agg_fn_evals'. Freed at the
+    /// same times as 'agg_fn_evals' are closed: either when the partition is closed or
+    /// when it is spilled.
+    boost::scoped_ptr<MemPool> agg_fn_perm_pool;
 
     /// Tuple stream used to store aggregated rows. When the partition is not spilled,
     /// (meaning the hash table is maintained), this stream is pinned and contains the
@@ -447,20 +452,6 @@ class PartitionedAggregationNode : public ExecNode {
     return ht;
   }
 
-  /// Materializes 'row_batch' in either grouping or non-grouping case.
-  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
-  /// Helper function called by GetNextInternal() to ensure that string data referenced in
-  /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
-  /// first row that should be processed in 'row_batch'.
-  Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
-
-  /// Copies string data from the specified slot into 'pool', and sets the StringValues'
-  /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
-  /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
-  Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
-      int first_row_idx, MemPool* pool);
-
   /// Constructs singleton output tuple, allocating memory from pool.
   Tuple* ConstructSingletonOutputTuple(
       const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c14a0904/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 6b7b791..4b49fb0 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -129,24 +129,17 @@ string PhjBuilder::GetName() {
   return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
 }
 
-void PhjBuilder::FreeLocalAllocations() const {
-  if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
-  for (const FilterContext& ctx : filter_ctxs_) {
-    if (ctx.expr_eval != nullptr) ctx.expr_eval->FreeLocalAllocations();
-  }
-}
-
 Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  RETURN_IF_ERROR(HashTableCtx::Create(&pool_, state, build_exprs_, build_exprs_,
+  RETURN_IF_ERROR(HashTableCtx::Create(&obj_pool_, state, build_exprs_, build_exprs_,
       HashTableStoresNulls(), is_not_distinct_from_, state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), expr_mem_pool(),
-      &ht_ctx_));
+      MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), expr_perm_pool_.get(),
+      expr_results_pool_.get(), expr_results_pool_.get(), &ht_ctx_));
 
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
   for (int i = 0; i < filter_exprs_.size(); ++i) {
-    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &pool_,
-        expr_mem_pool(), &filter_ctxs_[i].expr_eval));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &obj_pool_,
+        expr_perm_pool_.get(), expr_results_pool_.get(), &filter_ctxs_[i].expr_eval));
   }
 
   partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
@@ -210,8 +203,8 @@ Status PhjBuilder::Send(RuntimeState* state, RowBatch* batch) {
     }
   }
 
-  // Free any local allocations made during partitioning.
-  FreeLocalAllocations();
+  // Free any expr result allocations made during partitioning.
+  expr_results_pool_->Clear();
   COUNTER_ADD(num_build_rows_partitioned_, batch->num_rows());
   return Status::OK();
 }
@@ -266,7 +259,6 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) {
 
 void PhjBuilder::Close(RuntimeState* state) {
   if (closed_) return;
-  FreeLocalAllocations();
   CloseAndDeletePartitions();
   if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
@@ -275,13 +267,13 @@ void PhjBuilder::Close(RuntimeState* state) {
   }
   ScalarExpr::Close(filter_exprs_);
   ScalarExpr::Close(build_exprs_);
-  pool_.Clear();
+  obj_pool_.Clear();
   DataSink::Close(state);
   closed_ = true;
 }
 
 void PhjBuilder::Reset() {
-  FreeLocalAllocations();
+  expr_results_pool_->Clear();
   non_empty_build_ = false;
   CloseAndDeletePartitions();
 }
@@ -712,8 +704,8 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
     }
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->GetQueryStatus());
-    // Free any local allocations made while inserting.
-    parent_->FreeLocalAllocations();
+    // Free any expr result allocations made while inserting.
+    parent_->expr_results_pool_->Clear();
     batch.Reset();
   } while (!eos);