You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/03 17:16:26 UTC

[impala] 01/05: IMPALA-4080 [part 5]: Invoke close on Exprs from plan nodes and sink configs where they were created

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8eba9471facde2a8e721a39d6b0ddc0273e5d0cb
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Tue Feb 25 13:02:53 2020 -0800

    IMPALA-4080 [part 5]: Invoke close on Exprs from plan nodes and sink
    configs where they were created
    
    Currently the exprs created in plan nodes and sink configs were being
    closed in their respective exec nodes and data sink instances (that
    were created using them) when the fragment instance closes. This patch
    ensure that they are closed from the plan nodes and sink configs
    themselves so that when these are shared among instances, they don't
    prematurely close the expressions.
    
    Testing:
    Successfully passed exhaustive tests.
    
    Change-Id: I7b79d8b8f8e740609676dd13bd300a5514c65b0d
    Reviewed-on: http://gerrit.cloudera.org:8080/15318
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc         |  7 +++++-
 be/src/exec/aggregation-node-base.h          |  1 +
 be/src/exec/aggregator.cc                    |  7 ++++--
 be/src/exec/aggregator.h                     | 13 +++++------
 be/src/exec/analytic-eval-node.cc            | 34 ++++++++++++----------------
 be/src/exec/analytic-eval-node.h             |  3 ++-
 be/src/exec/data-sink.cc                     | 26 +++++++++++----------
 be/src/exec/data-sink.h                      |  5 +++-
 be/src/exec/exchange-node.cc                 |  8 +++++--
 be/src/exec/exchange-node.h                  |  3 ++-
 be/src/exec/exec-node.cc                     |  9 +++++++-
 be/src/exec/exec-node.h                      |  3 +++
 be/src/exec/grouping-aggregator.cc           |  8 +++++--
 be/src/exec/grouping-aggregator.h            | 11 +++++----
 be/src/exec/hdfs-scan-node-base.cc           | 21 ++++++++++++-----
 be/src/exec/hdfs-scan-node-base.h            |  5 ++--
 be/src/exec/hdfs-table-sink.cc               |  6 ++++-
 be/src/exec/hdfs-table-sink.h                |  3 ++-
 be/src/exec/nested-loop-join-node.cc         | 11 +++++----
 be/src/exec/nested-loop-join-node.h          |  7 +++---
 be/src/exec/non-grouping-aggregator.h        |  3 +--
 be/src/exec/partial-sort-node.cc             | 18 +++++++++------
 be/src/exec/partial-sort-node.h              |  5 ++--
 be/src/exec/partitioned-hash-join-builder.cc | 10 +++++---
 be/src/exec/partitioned-hash-join-builder.h  |  8 ++++---
 be/src/exec/partitioned-hash-join-node.cc    | 11 ++++++---
 be/src/exec/partitioned-hash-join-node.h     |  9 ++++----
 be/src/exec/scan-node.cc                     |  1 -
 be/src/exec/sort-node.cc                     | 18 +++++++++------
 be/src/exec/sort-node.h                      |  5 ++--
 be/src/exec/topn-node.cc                     | 15 +++++++-----
 be/src/exec/topn-node.h                      |  5 ++--
 be/src/exec/union-node.cc                    | 16 ++++++++-----
 be/src/exec/union-node.h                     |  5 ++--
 be/src/exec/unnest-node.cc                   | 18 +++++++--------
 be/src/exec/unnest-node.h                    |  3 ++-
 be/src/runtime/data-stream-test.cc           |  2 +-
 be/src/runtime/fragment-instance-state.cc    |  6 ++---
 be/src/runtime/fragment-instance-state.h     |  6 ++---
 be/src/runtime/krpc-data-stream-sender.cc    |  6 ++++-
 be/src/runtime/krpc-data-stream-sender.h     |  3 ++-
 41 files changed, 225 insertions(+), 139 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 3f8512a..2a12a3e 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -47,6 +47,11 @@ Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void AggregationPlanNode::Close() {
+  for (AggregatorConfig* config : aggs_) config->Close();
+  PlanNode::Close();
+}
+
 Status AggregationPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   if (tnode_->agg_node.aggregators[0].use_streaming_preaggregation) {
@@ -66,7 +71,7 @@ AggregationNodeBase::AggregationNodeBase(
   for (int i = 0; i < num_aggs; ++i) {
     const AggregatorConfig* agg = pnode.aggs_[i];
     unique_ptr<Aggregator> node;
-    if (agg->grouping_exprs_.empty()) {
+    if (agg->GetNumGroupingExprs() == 0) {
       const NonGroupingAggregatorConfig* non_grouping_config =
           static_cast<const NonGroupingAggregatorConfig*>(agg);
       node.reset(new NonGroupingAggregator(this, pool_, *non_grouping_config));
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index c5b480a..efdc964 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -28,6 +28,7 @@ namespace impala {
 class AggregationPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   ~AggregationPlanNode() {}
   /// Configuration for generating aggregators that will be eventually used to aggregate
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index b5e88c7..1347670 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -73,6 +73,11 @@ Status AggregatorConfig::Init(
   return Status::OK();
 }
 
+void AggregatorConfig::Close() {
+  ScalarExpr::Close(conjuncts_);
+  AggFn::Close(aggregate_functions_);
+}
+
 const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
 
 Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
@@ -122,9 +127,7 @@ Status Aggregator::Open(RuntimeState* state) {
 void Aggregator::Close(RuntimeState* state) {
   // Close all the agg-fn-evaluators
   AggFnEvaluator::Close(agg_fn_evals_, state);
-  AggFn::Close(agg_fns_);
   ScalarExprEvaluator::Close(conjunct_evals_, state);
-  ScalarExpr::Close(conjuncts_);
 
   if (expr_perm_pool_.get() != nullptr) expr_perm_pool_->FreeAll();
   if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 15f43cd..3293c9d 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -63,6 +63,8 @@ class AggregatorConfig {
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
   virtual Status Init(
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+  /// Closes the expressions created in Init();
+  virtual void Close();
   virtual Status Codegen(RuntimeState* state) = 0;
   virtual ~AggregatorConfig() {}
 
@@ -93,12 +95,11 @@ class AggregatorConfig {
 
   std::vector<ScalarExpr*> conjuncts_;
 
-  /// Exprs used to evaluate input rows
-  std::vector<ScalarExpr*> grouping_exprs_;
-
   /// The list of all aggregate operations for this aggregator.
   std::vector<AggFn*> aggregate_functions_;
 
+  virtual int GetNumGroupingExprs() const = 0;
+
  protected:
   /// Codegen for updating aggregate expressions aggregate_functions_[agg_fn_idx]
   /// and returns the IR function in 'fn'. Returns non-OK status if codegen
@@ -117,8 +118,6 @@ class AggregatorConfig {
 
   /// Codegen Aggregator::UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
   Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
-
-  virtual int GetNumGroupingExprs() = 0;
 };
 
 /// Base class for aggregating rows. Used in the AggregationNode and
@@ -217,7 +216,7 @@ class Aggregator {
   const bool needs_finalize_;
 
   /// The list of all aggregate operations for this aggregator.
-  std::vector<AggFn*> agg_fns_;
+  const std::vector<AggFn*>& agg_fns_;
 
   /// Evaluators for each aggregate function. If this is a grouping aggregation, these
   /// evaluators are only used to create cloned per-partition evaluators. The cloned
@@ -231,7 +230,7 @@ class Aggregator {
   /// Conjuncts and their evaluators in this aggregator. 'conjuncts_' live in the
   /// query-state's object pool while the evaluators live in this aggregator's
   /// object pool.
-  std::vector<ScalarExpr*> conjuncts_;
+  const std::vector<ScalarExpr*>& conjuncts_;
   std::vector<ScalarExprEvaluator*> conjunct_evals_;
 
   /// Runtime profile for this aggregator. Owned by 'pool_'.
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index f8c3e6a..37d1d31 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -91,6 +91,13 @@ Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void AnalyticEvalPlanNode::Close() {
+  AggFn::Close(analytic_fns_);
+  if (partition_by_eq_expr_ != nullptr) partition_by_eq_expr_->Close();
+  if (order_by_eq_expr_ != nullptr) order_by_eq_expr_->Close();
+  PlanNode::Close();
+}
+
 Status AnalyticEvalPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(
@@ -99,13 +106,16 @@ Status AnalyticEvalPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node
 }
 
 AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const AnalyticEvalPlanNode& pnode,
-      const TAnalyticNode& analytic_node, const DescriptorTbl& descs)
+    const TAnalyticNode& analytic_node, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
     window_(analytic_node.window),
     intermediate_tuple_desc_(
         descs.GetTupleDescriptor(analytic_node.intermediate_tuple_id)),
-    result_tuple_desc_(
-        descs.GetTupleDescriptor(analytic_node.output_tuple_id)) {
+    result_tuple_desc_(descs.GetTupleDescriptor(analytic_node.output_tuple_id)),
+    partition_by_eq_expr_(pnode.partition_by_eq_expr_),
+    order_by_eq_expr_(pnode.order_by_eq_expr_),
+    analytic_fns_(pnode.analytic_fns_),
+    is_lead_fn_(pnode.is_lead_fn_) {
   if (analytic_node.__isset.buffered_tuple_id) {
     buffered_tuple_desc_ =
         descs.GetTupleDescriptor(analytic_node.buffered_tuple_id);
@@ -144,11 +154,6 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const AnalyticEvalPlanNode&
     }
   }
   VLOG_FILE << id() << " Window=" << DebugWindowString();
-  // Set the Exprs from the PlanNode
-  partition_by_eq_expr_ = pnode.partition_by_eq_expr_;
-  order_by_eq_expr_ = pnode.order_by_eq_expr_;
-  analytic_fns_ = pnode.analytic_fns_;
-  is_lead_fn_ = pnode.is_lead_fn_;
 }
 
 AnalyticEvalNode::~AnalyticEvalNode() {
@@ -879,18 +884,9 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
     AggFnEvaluator::Finalize(analytic_fn_evals_, curr_tuple_, dummy_result_tuple_);
   }
   AggFnEvaluator::Close(analytic_fn_evals_, state);
-  AggFn::Close(analytic_fns_);
 
-  if (partition_by_eq_expr_ != nullptr) {
-    if (partition_by_eq_expr_eval_ != nullptr) {
-      partition_by_eq_expr_eval_->Close(state);
-    }
-    partition_by_eq_expr_->Close();
-  }
-  if (order_by_eq_expr_ != nullptr) {
-    if (order_by_eq_expr_eval_ != nullptr) order_by_eq_expr_eval_->Close(state);
-    order_by_eq_expr_->Close();
-  }
+  if (partition_by_eq_expr_eval_ != nullptr) partition_by_eq_expr_eval_->Close(state);
+  if (order_by_eq_expr_eval_ != nullptr) order_by_eq_expr_eval_->Close(state);
   if (curr_child_batch_.get() != nullptr) curr_child_batch_.reset();
   if (curr_tuple_pool_.get() != nullptr) curr_tuple_pool_->FreeAll();
   if (prev_tuple_pool_.get() != nullptr) prev_tuple_pool_->FreeAll();
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 0589cb4..d55f5f9 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -35,6 +35,7 @@ class ScalarExprEvaluator;
 class AnalyticEvalPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~AnalyticEvalPlanNode(){}
@@ -261,7 +262,7 @@ class AnalyticEvalNode : public ExecNode {
 
   /// Analytic functions and their evaluators. 'analytic_fns_' live in the query-state's
   /// objpool while the evaluators live in the exec node's objpool.
-  std::vector<AggFn*> analytic_fns_;
+  const std::vector<AggFn*>& analytic_fns_;
   std::vector<AggFnEvaluator*> analytic_fn_evals_;
 
   /// Indicates if each evaluator is the lead() fn. Used by ResetLeadFnSlots() to
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 3a35ae0..9ce4574 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -45,6 +45,10 @@ using strings::Substitute;
 
 namespace impala {
 
+void DataSinkConfig::Close() {
+  ScalarExpr::Close(output_exprs_);
+}
+
 Status DataSinkConfig::Init(
     const TDataSink& tsink, const RowDescriptor* input_row_desc, RuntimeState* state) {
   tsink_ = &tsink;
@@ -53,27 +57,27 @@ Status DataSinkConfig::Init(
 }
 
 Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
-    const RowDescriptor* row_desc, RuntimeState* state, const DataSinkConfig** sink) {
+    const RowDescriptor* row_desc, RuntimeState* state, DataSinkConfig** data_sink) {
   ObjectPool* pool = state->obj_pool();
-  DataSinkConfig* data_sink = nullptr;
+  *data_sink = nullptr;
   switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
       // TODO: figure out good buffer size based on size of output row
-      data_sink = pool->Add(new KrpcDataStreamSenderConfig());
+      *data_sink = pool->Add(new KrpcDataStreamSenderConfig());
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
       switch (thrift_sink.table_sink.type) {
         case TTableSinkType::HDFS:
-          data_sink = pool->Add(new HdfsTableSinkConfig());
+          *data_sink = pool->Add(new HdfsTableSinkConfig());
           break;
         case TTableSinkType::KUDU:
           RETURN_IF_ERROR(CheckKuduAvailability());
-          data_sink = pool->Add(new KuduTableSinkConfig());
+          *data_sink = pool->Add(new KuduTableSinkConfig());
           break;
         case TTableSinkType::HBASE:
-          data_sink = pool->Add(new HBaseTableSinkConfig());
+          *data_sink = pool->Add(new HBaseTableSinkConfig());
           break;
         default:
           stringstream error_msg;
@@ -87,14 +91,14 @@ Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
       }
       break;
     case TDataSinkType::PLAN_ROOT_SINK:
-      data_sink = pool->Add(new PlanRootSinkConfig());
+      *data_sink = pool->Add(new PlanRootSinkConfig());
       break;
     case TDataSinkType::HASH_JOIN_BUILDER: {
-      data_sink = pool->Add(new PhjBuilderConfig());
+      *data_sink = pool->Add(new PhjBuilderConfig());
       break;
     }
     case TDataSinkType::NESTED_LOOP_JOIN_BUILDER: {
-      data_sink = pool->Add(new NljBuilderConfig());
+      *data_sink = pool->Add(new NljBuilderConfig());
       break;
     }
     default:
@@ -107,8 +111,7 @@ Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
       error_msg << str << " not implemented.";
       return Status(error_msg.str());
   }
-  RETURN_IF_ERROR(data_sink->Init(thrift_sink, row_desc, state));
-  *sink = data_sink;
+  RETURN_IF_ERROR((*data_sink)->Init(thrift_sink, row_desc, state));
   return Status::OK();
 }
 
@@ -159,7 +162,6 @@ Status DataSink::Open(RuntimeState* state) {
 void DataSink::Close(RuntimeState* state) {
   if (closed_) return;
   ScalarExprEvaluator::Close(output_expr_evals_, state);
-  ScalarExpr::Close(output_exprs_);
   if (expr_perm_pool_ != nullptr) expr_perm_pool_->FreeAll();
   if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
   if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d90ea99..8736692 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -57,6 +57,9 @@ class DataSinkConfig {
   virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* state) const = 0;
 
+  /// Close() releases all resources that were allocated during creation.
+  virtual void Close();
+
   /// Pointer to the thrift data sink struct associated with this sink. Set in Init() and
   /// owned by QueryState.
   const TDataSink* tsink_ = nullptr;
@@ -72,7 +75,7 @@ class DataSinkConfig {
   /// Creates a new data sink config, allocated in state->obj_pool() and returned through
   /// *sink, from the thrift sink object in fragment_ctx.
   static Status CreateConfig(const TDataSink& thrift_sink, const RowDescriptor* row_desc,
-      RuntimeState* state, const DataSinkConfig** sink);
+      RuntimeState* state, DataSinkConfig** sink);
 
  protected:
   /// Sets reference to TDataSink and initializes the expressions. Returns error status on
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index a3e0726..f840545 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -53,6 +53,11 @@ Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void ExchangePlanNode::Close() {
+  ScalarExpr::Close(ordering_exprs_);
+  PlanNode::Close();
+}
+
 Status ExchangePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(new ExchangeNode(pool, *this, state->desc_tbl()));
@@ -70,6 +75,7 @@ ExchangeNode::ExchangeNode(
                             + pnode.tnode_->exchange_node.input_row_tuples.size())),
     next_row_idx_(0),
     is_merging_(pnode.tnode_->exchange_node.__isset.sort_info),
+    ordering_exprs_(pnode.ordering_exprs_),
     offset_(pnode.tnode_->exchange_node.__isset.offset ?
             pnode.tnode_->exchange_node.offset :
             0),
@@ -77,7 +83,6 @@ ExchangeNode::ExchangeNode(
   DCHECK_GE(offset_, 0);
   DCHECK(is_merging_ || (offset_ == 0));
   if (!is_merging_) return;
-  ordering_exprs_ = pnode.ordering_exprs_;
   is_asc_order_ = pnode.is_asc_order_;
   nulls_first_ = pnode.nulls_first_;
 }
@@ -151,7 +156,6 @@ void ExchangeNode::Close(RuntimeState* state) {
   if (less_than_.get() != nullptr) less_than_->Close(state);
   if (stream_recvr_ != nullptr) stream_recvr_->Close();
   ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&recvr_buffer_pool_client_);
-  ScalarExpr::Close(ordering_exprs_);
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index e0f459d..97b503b 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -34,6 +34,7 @@ class TupleRowComparator;
 class ExchangePlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~ExchangePlanNode(){}
@@ -128,7 +129,7 @@ class ExchangeNode : public ExecNode {
   boost::scoped_ptr<TupleRowComparator> less_than_;
 
   /// Sort expressions and parameters passed to the merging receiver..
-  std::vector<ScalarExpr*> ordering_exprs_;
+  const std::vector<ScalarExpr*>& ordering_exprs_;
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 3b93172..54e3e04 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -85,6 +85,14 @@ Status PlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void PlanNode::Close() {
+  ScalarExpr::Close(conjuncts_);
+  ScalarExpr::Close(runtime_filter_exprs_);
+  for (auto& child : children_) {
+    child->Close();
+  }
+}
+
 Status PlanNode::CreateTree(
       RuntimeState* state, const TPlan& plan, PlanNode** root) {
   if (plan.nodes.size() == 0) {
@@ -297,7 +305,6 @@ void ExecNode::Close(RuntimeState* state) {
   }
 
   ScalarExprEvaluator::Close(conjunct_evals_, state);
-  ScalarExpr::Close(conjuncts_);
   if (expr_perm_pool() != nullptr) expr_perm_pool_->FreeAll();
   if (expr_results_pool() != nullptr) expr_results_pool_->FreeAll();
   reservation_manager_.Close(state);
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index fa90ffb..ec74ebc 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -70,6 +70,9 @@ class PlanNode {
   /// Should only be called after all children have been set and Init()-ed.
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
 
+  /// Close() releases all resources that were allocated in Init().
+  virtual void Close();
+
   /// Create its corresponding exec node. Place exec node in state->obj_pool().
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const = 0;
 
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index e5b64b9..14aa6c1 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -119,6 +119,12 @@ Status GroupingAggregatorConfig::Init(
   return Status::OK();
 }
 
+void GroupingAggregatorConfig::Close() {
+  ScalarExpr::Close(build_exprs_);
+  ScalarExpr::Close(grouping_exprs_);
+  AggregatorConfig::Close();
+}
+
 static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
@@ -417,8 +423,6 @@ void GroupingAggregator::Close(RuntimeState* state) {
   if (serialize_stream_.get() != nullptr) {
     serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
-  ScalarExpr::Close(grouping_exprs_);
-  ScalarExpr::Close(build_exprs_);
   reservation_manager_.Close(state);
   if (reservation_tracker_ != nullptr) reservation_tracker_->Close();
   // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed.
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index cc4c9c9..b4509bb 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -122,6 +122,7 @@ class GroupingAggregatorConfig : public AggregatorConfig {
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode, int agg_idx);
   Status Init(
       const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) override;
+  void Close() override;
   Status Codegen(RuntimeState* state) override;
   ~GroupingAggregatorConfig() override {}
 
@@ -146,6 +147,9 @@ class GroupingAggregatorConfig : public AggregatorConfig {
   /// All the exprs are simply SlotRefs for the intermediate tuple.
   std::vector<ScalarExpr*> build_exprs_;
 
+  /// Exprs used to evaluate input rows
+  std::vector<ScalarExpr*> grouping_exprs_;
+
   /// Indices of grouping exprs with var-len string types in grouping_exprs_.
   /// We need to do more work for var-len expressions when allocating and spilling rows.
   /// All var-len grouping exprs have type string.
@@ -165,8 +169,7 @@ class GroupingAggregatorConfig : public AggregatorConfig {
   /// Jitted AddBatchStreamingImpl function pointer. Null if codegen is disabled.
   AddBatchStreamingImplFn add_batch_streaming_impl_fn_ = nullptr;
 
- protected:
-  int GetNumGroupingExprs() override { return grouping_exprs_.size(); }
+  int GetNumGroupingExprs() const override { return grouping_exprs_.size(); }
 
  private:
   /// Codegen the non-streaming add row batch loop. The loop has already been compiled to
@@ -264,11 +267,11 @@ class GroupingAggregator : public Aggregator {
   bool needs_serialize_ = false;
 
   /// Exprs used to evaluate input rows
-  std::vector<ScalarExpr*> grouping_exprs_;
+  const std::vector<ScalarExpr*>& grouping_exprs_;
 
   /// Exprs used to insert constructed aggregation tuple into the hash table.
   /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<ScalarExpr*> build_exprs_;
+  const std::vector<ScalarExpr*>& build_exprs_;
 
   /// Indices of grouping exprs with var-len string types in grouping_exprs_.
   /// We need to do more work for var-len expressions when allocating and spilling rows.
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index faaf55c..984ca88 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -243,6 +243,17 @@ int HdfsScanPlanNode::GetMaterializedSlotIdx(const std::vector<int>& path) const
   return result->second;
 }
 
+void HdfsScanPlanNode::Close() {
+  TTupleId tuple_id = tnode_->hdfs_scan_node.tuple_id;
+  for (auto& tid_conjunct : conjuncts_map_) {
+    // PlanNode::conjuncts_ are already closed in PlanNode::Close()
+    if (tid_conjunct.first == tuple_id) continue;
+    ScalarExpr::Close(tid_conjunct.second);
+  }
+  ScalarExpr::Close(min_max_conjuncts_);
+  PlanNode::Close();
+}
+
 Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(tnode_->hdfs_scan_node.use_mt_scan_node ?
@@ -569,16 +580,14 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
   if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
 
   // Close collection conjuncts
-  for (auto& tid_conjunct: conjuncts_map_) {
-    // conjuncts_ are already closed in ExecNode::Close()
-    if (tid_conjunct.first == tuple_id_) continue;
-    ScalarExprEvaluator::Close(conjunct_evals_map_[tid_conjunct.first], state);
-    ScalarExpr::Close(tid_conjunct.second);
+  for (auto& tid_conjunct_eval : conjunct_evals_map_) {
+    // ExecNode::conjunct_evals_ are already closed in ExecNode::Close()
+    if (tid_conjunct_eval.first == tuple_id_) continue;
+    ScalarExprEvaluator::Close(tid_conjunct_eval.second, state);
   }
 
   // Close min max conjunct
   ScalarExprEvaluator::Close(min_max_conjunct_evals_, state);
-  ScalarExpr::Close(min_max_conjuncts_);
   ScanNode::Close(state);
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index edd8cc3..0d8c5d2 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -102,6 +102,7 @@ struct ScanRangeMetadata {
 class HdfsScanPlanNode : public ScanPlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   void Codegen(RuntimeState* state, RuntimeProfile* profile);
 
@@ -456,7 +457,7 @@ class HdfsScanNodeBase : public ScanNode {
   const int min_max_tuple_id_;
 
   /// Conjuncts to evaluate on parquet::Statistics.
-  vector<ScalarExpr*> min_max_conjuncts_;
+  const vector<ScalarExpr*>& min_max_conjuncts_;
   vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
   /// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
@@ -517,7 +518,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
   /// item tuples). Includes a copy of ExecNode.conjuncts_.
   typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
-  ConjunctsMap conjuncts_map_;
+  const ConjunctsMap& conjuncts_map_;
   ConjunctEvaluatorsMap conjunct_evals_map_;
 
   /// Dictionary filtering eligible conjuncts for each slot. Set to nullptr when there
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 05a00ee..7007b08 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -71,6 +71,11 @@ DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
       new HdfsTableSink(sink_id, *this, this->tsink_->table_sink.hdfs_table_sink, state));
 }
 
+void HdfsTableSinkConfig::Close() {
+  ScalarExpr::Close(partition_key_exprs_);
+  DataSinkConfig::Close();
+}
+
 HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config,
     const THdfsTableSink& hdfs_sink, RuntimeState* state)
   : DataSink(sink_id, sink_config, "HdfsTableSink", state),
@@ -703,7 +708,6 @@ void HdfsTableSink::Close(RuntimeState* state) {
   }
   partition_keys_to_output_partitions_.clear();
   ScalarExprEvaluator::Close(partition_key_expr_evals_, state);
-  ScalarExpr::Close(partition_key_exprs_);
   DataSink::Close(state);
   closed_ = true;
 }
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 0ea7009..ccfbd54 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -99,6 +99,7 @@ class HdfsTableSinkConfig : public DataSinkConfig {
   DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
       RuntimeState* state) const override;
+  void Close() override;
 
   /// Expressions for computing the target partitions to which a row is written.
   std::vector<ScalarExpr*> partition_key_exprs_;
@@ -319,7 +320,7 @@ class HdfsTableSink : public DataSink {
   PartitionMap partition_keys_to_output_partitions_;
 
   /// Expressions for computing the target partitions to which a row is written.
-  std::vector<ScalarExpr*> partition_key_exprs_;
+  const std::vector<ScalarExpr*>& partition_key_exprs_;
   std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;
 
   /// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare().
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index e5f67eb..f01d18c 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -50,6 +50,11 @@ Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state)
   return Status::OK();
 }
 
+void NestedLoopJoinPlanNode::Close() {
+  ScalarExpr::Close(join_conjuncts_);
+  PlanNode::Close();
+}
+
 Status NestedLoopJoinPlanNode::CreateExecNode(
     RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
@@ -62,9 +67,8 @@ NestedLoopJoinNode::NestedLoopJoinNode(
   : BlockingJoinNode("NestedLoopJoinNode", pool, pnode, descs),
     build_batches_(NULL),
     current_build_row_idx_(0),
-    process_unmatched_build_rows_(false) {
-  join_conjuncts_ = pnode.join_conjuncts_;
-}
+    process_unmatched_build_rows_(false),
+    join_conjuncts_(pnode.join_conjuncts_) {}
 
 NestedLoopJoinNode::~NestedLoopJoinNode() {
   DCHECK(is_closed());
@@ -157,7 +161,6 @@ Status NestedLoopJoinNode::Reset(RuntimeState* state, RowBatch* row_batch) {
 void NestedLoopJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   ScalarExprEvaluator::Close(join_conjunct_evals_, state);
-  ScalarExpr::Close(join_conjuncts_);
   if (builder_ != NULL) {
     // IMPALA-6595: builder must be closed before child. The separate build case is
     // handled in FragmentInstanceState.
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 0af115f..437efe2 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -39,6 +39,7 @@ class NestedLoopJoinPlanNode : public BlockingJoinPlanNode {
   std::vector<ScalarExpr*> join_conjuncts_;
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~NestedLoopJoinPlanNode(){}
@@ -78,7 +79,7 @@ class NestedLoopJoinNode : public BlockingJoinNode {
   RowBatchList::TupleRowIterator build_row_iterator_;
 
   /// Ordinal position of current_build_row_ [0, num_build_rows_).
-  int64_t current_build_row_idx_;
+  int64_t current_build_row_idx_ = 0;
 
   /// Bitmap used to identify matching build tuples for the case of OUTER/SEMI/ANTI
   /// joins. Owned exclusively by the nested loop join node.
@@ -87,13 +88,13 @@ class NestedLoopJoinNode : public BlockingJoinNode {
 
   /// If true, we've started processing the unmatched build rows. Only used in
   /// RIGHT OUTER JOIN, RIGHT ANTI JOIN and FULL OUTER JOIN modes.
-  bool process_unmatched_build_rows_;
+  bool process_unmatched_build_rows_ = false;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
   /// Join conjuncts
-  std::vector<ScalarExpr*> join_conjuncts_;
+  const std::vector<ScalarExpr*>& join_conjuncts_;
   std::vector<ScalarExprEvaluator*> join_conjunct_evals_;
 
   /// Optimized build for the case where the right child is a SingularRowSrcNode.
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index 3ffda63..33b0091 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -49,8 +49,7 @@ class NonGroupingAggregatorConfig : public AggregatorConfig {
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
   AddBatchImplFn add_batch_impl_fn_ = nullptr;
 
- protected:
-  int GetNumGroupingExprs() override { return 0; }
+  int GetNumGroupingExprs() const override { return 0; }
 
  private:
   /// Codegen the non-streaming add row batch loop in NonGroupingAggregator::AddBatch()
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 1586f46..f74f6a5 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -43,6 +43,12 @@ Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void PartialSortPlanNode::Close() {
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(sort_tuple_slot_exprs_);
+  PlanNode::Close();
+}
+
 Status PartialSortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(new PartialSortNode(pool, *this, state->desc_tbl()));
@@ -52,15 +58,15 @@ Status PartialSortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node)
 PartialSortNode::PartialSortNode(
     ObjectPool* pool, const PartialSortPlanNode& pnode, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
+    ordering_exprs_(pnode.ordering_exprs_),
+    sort_tuple_exprs_(pnode.sort_tuple_slot_exprs_),
+    is_asc_order_(pnode.is_asc_order_),
+    nulls_first_(pnode.nulls_first_),
+    sorting_order_(pnode.sorting_order_),
     sorter_(nullptr),
     input_batch_index_(0),
     input_eos_(false),
     sorter_eos_(true) {
-  ordering_exprs_ = pnode.ordering_exprs_;
-  sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
-  is_asc_order_ = pnode.is_asc_order_;
-  nulls_first_ = pnode.nulls_first_;
-  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Partial");
 }
 
@@ -169,8 +175,6 @@ void PartialSortNode::Close(RuntimeState* state) {
   child(0)->Close(state);
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
-  ScalarExpr::Close(ordering_exprs_);
-  ScalarExpr::Close(sort_tuple_exprs_);
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index 9cbe988..887e6a7 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -26,6 +26,7 @@ namespace impala {
 class PartialSortPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~PartialSortPlanNode(){}
@@ -78,11 +79,11 @@ class PartialSortNode : public ExecNode {
 
  private:
   /// Expressions and parameters used for tuple comparison.
-  std::vector<ScalarExpr*> ordering_exprs_;
+  const std::vector<ScalarExpr*>& ordering_exprs_;
 
   /// Expressions used to materialize slots in the tuples to be sorted.
   /// One expr per slot in the materialized tuple.
-  std::vector<ScalarExpr*> sort_tuple_exprs_;
+  const std::vector<ScalarExpr*>& sort_tuple_exprs_;
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index a5e3b2f..29874f0 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -73,7 +73,7 @@ Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
     TJoinOp::type join_op, const RowDescriptor* build_row_desc,
     const std::vector<TEqJoinCondition>& eq_join_conjuncts,
     const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
-    const PhjBuilderConfig** sink) {
+    PhjBuilderConfig** sink) {
   ObjectPool* pool = state->obj_pool();
   TDataSink* tsink = pool->Add(new TDataSink());
   PhjBuilderConfig* data_sink = pool->Add(new PhjBuilderConfig());
@@ -83,6 +83,12 @@ Status PhjBuilderConfig::CreateConfig(RuntimeState* state, int join_node_id,
   return Status::OK();
 }
 
+void PhjBuilderConfig::Close() {
+  ScalarExpr::Close(build_exprs_);
+  ScalarExpr::Close(filter_exprs_);
+  DataSinkConfig::Close();
+}
+
 Status PhjBuilderConfig::InitExprsAndFilters(RuntimeState* state,
     const vector<TEqJoinCondition>& eq_join_conjuncts,
     const vector<TRuntimeFilterDesc>& filter_descs) {
@@ -374,8 +380,6 @@ void PhjBuilder::Close(RuntimeState* state) {
   for (const FilterContext& ctx : filter_ctxs_) {
     if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state);
   }
-  ScalarExpr::Close(filter_exprs_);
-  ScalarExpr::Close(build_exprs_);
   obj_pool_.Clear();
   probe_stream_reservation_.Close();
   if (is_separate_build_) reservation_manager_.Close(state);
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 3a95482..5a7dc33 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -65,7 +65,9 @@ class PhjBuilderConfig : public JoinBuilderConfig {
       const RowDescriptor* build_row_desc,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
       const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed,
-      const PhjBuilderConfig** sink);
+      PhjBuilderConfig** sink);
+
+  void Close() override;
 
   void Codegen(RuntimeState* state, RuntimeProfile* profile);
 
@@ -682,7 +684,7 @@ class PhjBuilder : public JoinBuilder {
   boost::scoped_ptr<Suballocator> ht_allocator_;
 
   /// Expressions over input rows for hash table build.
-  std::vector<ScalarExpr*> build_exprs_;
+  const std::vector<ScalarExpr*>& build_exprs_;
 
   /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
   /// NOT DISTINCT FROM, rather than equality.
@@ -690,7 +692,7 @@ class PhjBuilder : public JoinBuilder {
 
   /// Expressions for evaluating input rows for insertion into runtime filters.
   /// Only includes exprs for filters produced by this builder.
-  std::vector<ScalarExpr*> filter_exprs_;
+  const std::vector<ScalarExpr*>& filter_exprs_;
 
   /// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'.
   std::vector<FilterContext> filter_ctxs_;
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 617493f..05d4cb6 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -90,6 +90,14 @@ Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* s
   return Status::OK();
 }
 
+void PartitionedHashJoinPlanNode::Close() {
+  ScalarExpr::Close(probe_exprs_);
+  ScalarExpr::Close(build_exprs_);
+  ScalarExpr::Close(other_join_conjuncts_);
+  if (phj_builder_config != nullptr) phj_builder_config->Close();
+  PlanNode::Close();
+}
+
 Status PartitionedHashJoinPlanNode::CreateExecNode(
     RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
@@ -312,9 +320,6 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
     }
   }
   ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
-  ScalarExpr::Close(build_exprs_);
-  ScalarExpr::Close(probe_exprs_);
-  ScalarExpr::Close(other_join_conjuncts_);
   if (probe_expr_results_pool_ != nullptr) probe_expr_results_pool_->FreeAll();
   BlockingJoinNode::Close(state);
 }
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index f68a022..8721d35 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -41,6 +41,7 @@ class TupleRow;
 class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   void Codegen(RuntimeState* state, RuntimeProfile* profile);
 
@@ -60,7 +61,7 @@ class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
 
   /// Data sink config object for creating a phj builder that will be eventually used by
   /// the exec node.
-  const PhjBuilderConfig* phj_builder_config;
+  PhjBuilderConfig* phj_builder_config;
 
   /// Seed used for hashing rows.
   uint32_t hash_seed_;
@@ -551,11 +552,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Our equi-join predicates "<lhs> = <rhs>" are separated into
   /// build_exprs_ (over the build input row) and probe_exprs_ (over child(0))
-  std::vector<ScalarExpr*> build_exprs_;
-  std::vector<ScalarExpr*> probe_exprs_;
+  const std::vector<ScalarExpr*>& build_exprs_;
+  const std::vector<ScalarExpr*>& probe_exprs_;
 
   /// Non-equi-join conjuncts from the ON clause.
-  std::vector<ScalarExpr*> other_join_conjuncts_;
+  const std::vector<ScalarExpr*>& other_join_conjuncts_;
   std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_;
 
   /// Reference to the hash table config which is a part of the
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 0f135c4..267f624 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -201,7 +201,6 @@ void ScanNode::Close(RuntimeState* state) {
   for (auto& filter_ctx : filter_ctxs_) {
     if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state);
   }
-  ScalarExpr::Close(filter_exprs_);
   // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_'
   // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters.
   runtime_profile_->StopPeriodicCounters();
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 5943fc0..b402bdb 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -42,6 +42,12 @@ Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void SortPlanNode::Close() {
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(sort_tuple_slot_exprs_);
+  PlanNode::Close();
+}
+
 Status SortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(new SortNode(pool, *this, state->desc_tbl()));
@@ -52,13 +58,13 @@ SortNode::SortNode(
     ObjectPool* pool, const SortPlanNode& pnode, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
     offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
+    ordering_exprs_(pnode.ordering_exprs_),
+    sort_tuple_exprs_(pnode.sort_tuple_slot_exprs_),
+    is_asc_order_(pnode.is_asc_order_),
+    nulls_first_(pnode.nulls_first_),
+    sorting_order_(pnode.sorting_order_),
     sorter_(NULL),
     num_rows_skipped_(0) {
-  ordering_exprs_ = pnode.ordering_exprs_;
-  sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
-  is_asc_order_ = pnode.is_asc_order_;
-  nulls_first_ = pnode.nulls_first_;
-  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Total");
 }
 
@@ -167,8 +173,6 @@ void SortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
-  ScalarExpr::Close(ordering_exprs_);
-  ScalarExpr::Close(sort_tuple_exprs_);
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 8de3c01..487e20f 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -26,6 +26,7 @@ namespace impala {
 class SortPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~SortPlanNode(){}
@@ -75,11 +76,11 @@ class SortNode : public ExecNode {
   int64_t offset_;
 
   /// Expressions and parameters used for tuple comparison.
-  std::vector<ScalarExpr*> ordering_exprs_;
+  const std::vector<ScalarExpr*>& ordering_exprs_;
 
   /// Expressions used to materialize slots in the tuples to be sorted.
   /// One expr per slot in the materialized tuple.
-  std::vector<ScalarExpr*> sort_tuple_exprs_;
+  const std::vector<ScalarExpr*>& sort_tuple_exprs_;
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 7f21316..e482f24 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -54,6 +54,11 @@ Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       << "TopNNode should never have predicates to evaluate.";
   return Status::OK();
 }
+void TopNPlanNode::Close() {
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(output_tuple_exprs_);
+  PlanNode::Close();
+}
 
 Status TopNPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
@@ -65,6 +70,10 @@ TopNNode::TopNNode(
     ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs)
   : ExecNode(pool, pnode, descs),
     offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
+    ordering_exprs_(pnode.ordering_exprs_),
+    output_tuple_exprs_(pnode.output_tuple_exprs_),
+    is_asc_order_(pnode.is_asc_order_),
+    nulls_first_(pnode.nulls_first_),
     output_tuple_desc_(row_descriptor_.tuple_descriptors()[0]),
     tuple_row_less_than_(NULL),
     tmp_tuple_(NULL),
@@ -73,10 +82,6 @@ TopNNode::TopNNode(
     rows_to_reclaim_(0),
     tuple_pool_reclaim_counter_(NULL),
     num_rows_skipped_(0) {
-  ordering_exprs_ = pnode.ordering_exprs_;
-  output_tuple_exprs_ = pnode.output_tuple_exprs_;
-  is_asc_order_ = pnode.is_asc_order_;
-  nulls_first_ = pnode.nulls_first_;
   runtime_profile()->AddInfoString("SortType", "TopN");
 }
 
@@ -241,8 +246,6 @@ void TopNNode::Close(RuntimeState* state) {
   if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
   if (tuple_row_less_than_.get() != nullptr) tuple_row_less_than_->Close(state);
   ScalarExprEvaluator::Close(output_tuple_expr_evals_, state);
-  ScalarExpr::Close(ordering_exprs_);
-  ScalarExpr::Close(output_tuple_exprs_);
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index 53abab9..77606f4 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -36,6 +36,7 @@ class Tuple;
 class TopNPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
 
   ~TopNPlanNode(){}
@@ -106,10 +107,10 @@ class TopNNode : public ExecNode {
   int64_t offset_;
 
   /// Ordering expressions used for tuple comparison.
-  std::vector<ScalarExpr*> ordering_exprs_;
+  const std::vector<ScalarExpr*>& ordering_exprs_;
 
   /// Materialization exprs for the output tuple and their evaluators.
-  std::vector<ScalarExpr*> output_tuple_exprs_;
+  const std::vector<ScalarExpr*>& output_tuple_exprs_;
   std::vector<ScalarExprEvaluator*> output_tuple_expr_evals_;
 
   std::vector<bool> is_asc_order_;
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 10c20ab..a929748 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -62,6 +62,16 @@ Status UnionPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void UnionPlanNode::Close() {
+  for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
+    ScalarExpr::Close(const_exprs);
+  }
+  for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
+    ScalarExpr::Close(child_exprs);
+  }
+  PlanNode::Close();
+}
+
 Status UnionPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
   ObjectPool* pool = state->obj_pool();
   *node = pool->Add(new UnionNode(pool, *this, state->desc_tbl()));
@@ -354,11 +364,5 @@ void UnionNode::Close(RuntimeState* state) {
   for (const vector<ScalarExprEvaluator*>& evals : child_expr_evals_lists_) {
     ScalarExprEvaluator::Close(evals, state);
   }
-  for (const vector<ScalarExpr*>& const_exprs : const_exprs_lists_) {
-    ScalarExpr::Close(const_exprs);
-  }
-  for (const vector<ScalarExpr*>& child_exprs : child_exprs_lists_) {
-    ScalarExpr::Close(child_exprs);
-  }
   ExecNode::Close(state);
 }
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index c5c7968..04bdcf4 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -40,6 +40,7 @@ class UnionNode;
 class UnionPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   void Codegen(RuntimeState* state, RuntimeProfile* runtime_profile);
 
@@ -105,11 +106,11 @@ class UnionNode : public ExecNode {
 
   /// Const exprs materialized by this node. These exprs don't refer to any children.
   /// Only materialized by the first fragment instance to avoid duplication.
-  std::vector<std::vector<ScalarExpr*>> const_exprs_lists_;
+  const std::vector<std::vector<ScalarExpr*>>& const_exprs_lists_;
   std::vector<std::vector<ScalarExprEvaluator*>> const_expr_evals_lists_;
 
   /// Exprs materialized by this node. The i-th result expr list refers to the i-th child.
-  std::vector<std::vector<ScalarExpr*>> child_exprs_lists_;
+  const std::vector<std::vector<ScalarExpr*>>& child_exprs_lists_;
   std::vector<std::vector<ScalarExprEvaluator*>> child_expr_evals_lists_;
 
   /// Reference to the codegened vector containing codegened function pointer owned by the
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 1dc8522..a20be34 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -37,6 +37,11 @@ Status UnnestPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   return Status::OK();
 }
 
+void UnnestPlanNode::Close() {
+  if (collection_expr_ != nullptr) collection_expr_->Close();
+  PlanNode::Close();
+}
+
 Status UnnestPlanNode::InitCollExpr(RuntimeState* state) {
   DCHECK(containing_subplan_ != nullptr)
       << "set_containing_subplan() must have been called";
@@ -66,10 +71,10 @@ UnnestNode::UnnestNode(
   : ExecNode(pool, pnode, descs),
     item_byte_size_(0),
     thrift_coll_expr_(pnode.tnode_->unnest_node.collection_expr),
-    coll_expr_(nullptr),
+    coll_expr_(pnode.collection_expr_),
     coll_expr_eval_(nullptr),
-    coll_slot_desc_(nullptr),
-    coll_tuple_idx_(-1),
+    coll_slot_desc_(pnode.coll_slot_desc_),
+    coll_tuple_idx_(pnode.coll_tuple_idx_),
     coll_value_(nullptr),
     item_idx_(0),
     num_collections_(0),
@@ -79,11 +84,7 @@ UnnestNode::UnnestNode(
     avg_collection_size_counter_(nullptr),
     max_collection_size_counter_(nullptr),
     min_collection_size_counter_(nullptr),
-    num_collections_counter_(nullptr) {
-  coll_expr_ = pnode.collection_expr_;
-  coll_slot_desc_ = pnode.coll_slot_desc_;
-  coll_tuple_idx_ = pnode.coll_tuple_idx_;
-}
+    num_collections_counter_(nullptr) {}
 
 Status UnnestNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -191,7 +192,6 @@ Status UnnestNode::Reset(RuntimeState* state, RowBatch* row_batch) {
 void UnnestNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (coll_expr_eval_ != nullptr) coll_expr_eval_->Close(state);
-  if (coll_expr_ != nullptr) coll_expr_->Close();
   ExecNode::Close(state);
 }
 
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 86339bf..0b8bf6d 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -29,6 +29,7 @@ class TupleDescriptor;
 class UnnestPlanNode : public PlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual void Close() override;
   virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
   /// Initializes the expression which produces the collection to be unnested.
   /// Called by the containing subplan plan-node.
@@ -39,7 +40,7 @@ class UnnestPlanNode : public PlanNode {
   /// Expr that produces the collection to be unnested. Currently always a SlotRef into an
   /// collection-typed slot. We do not evaluate this expr for setting coll_value_, but
   /// instead manually retrieve the slot value to support projection (see class comment).
-  ScalarExpr* collection_expr_;
+  ScalarExpr* collection_expr_ = nullptr;
 
   /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. Set in
   /// Prepare().  This slot is always set to NULL in Open() as a simple projection.
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 23720ce..cca7f94 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -532,7 +532,7 @@ class DataStreamTest : public testing::Test {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
-    const DataSinkConfig* data_sink = nullptr;
+    DataSinkConfig* data_sink = nullptr;
     EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &state, &data_sink));
 
     // We create an object of the base class DataSink and cast to the appropriate sender
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 8f3292e..c5a1d50 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -174,10 +174,8 @@ Status FragmentInstanceState::Prepare() {
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
 
-  PlanNode* plan_tree = nullptr;
   RETURN_IF_ERROR(
-      PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree));
-  plan_tree_ = plan_tree;
+      PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree_));
   // set up plan
   RETURN_IF_ERROR(ExecNode::CreateTree(
       runtime_state_, *plan_tree_, query_state_->desc_tbl(), &exec_tree_));
@@ -423,6 +421,7 @@ void FragmentInstanceState::Close() {
 
   // guard against partially-finished Prepare()
   if (sink_ != nullptr) sink_->Close(runtime_state_);
+  if (sink_config_ != nullptr) sink_config_->Close();
 
   // Stop updating profile counters in background.
   profile()->StopPeriodicCounters();
@@ -430,6 +429,7 @@ void FragmentInstanceState::Close() {
   // Delete row_batch_ to free resources associated with it.
   row_batch_.reset();
   if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_);
+  if (plan_tree_ != nullptr) plan_tree_->Close();
   runtime_state_->ReleaseResources();
 
   // Sanity timer checks
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 921c273..20856be 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -163,9 +163,9 @@ class FragmentInstanceState {
   /// in Prepare().
   ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
-  /// Lives in obj_pool(). Not mutated after being initialized.
-  const PlanNode* plan_tree_ = nullptr;
-  const DataSinkConfig* sink_config_ = nullptr;
+  /// Lives in obj_pool(). Not mutated after being initialized except for being closed.
+  PlanNode* plan_tree_ = nullptr;
+  DataSinkConfig* sink_config_ = nullptr;
 
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 2b1f63a..d211986 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -98,6 +98,11 @@ DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& fragmen
       fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
 }
 
+void KrpcDataStreamSenderConfig::Close() {
+  ScalarExpr::Close(partition_exprs_);
+  DataSinkConfig::Close();
+}
+
 // A datastream sender may send row batches to multiple destinations. There is one
 // channel for each destination.
 //
@@ -1074,7 +1079,6 @@ void KrpcDataStreamSender::Close(RuntimeState* state) {
     channels_[i]->Teardown(state);
   }
   ScalarExprEvaluator::Close(partition_expr_evals_, state);
-  ScalarExpr::Close(partition_exprs_);
   profile()->StopPeriodicCounters();
   DataSink::Close(state);
 }
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index 3c51534..e3801e6 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -45,6 +45,7 @@ class KrpcDataStreamSenderConfig : public DataSinkConfig {
   DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
       RuntimeState* state) const override;
+  void Close() override;
 
   void Codegen(RuntimeState* state, RuntimeProfile* profile);
 
@@ -213,7 +214,7 @@ class KrpcDataStreamSender : public DataSink {
 
   /// Expressions of partition keys. It's used to compute the
   /// per-row partition values for shuffling exchange;
-  std::vector<ScalarExpr*> partition_exprs_;
+  const std::vector<ScalarExpr*>& partition_exprs_;
   std::vector<ScalarExprEvaluator*> partition_expr_evals_;
 
   /// Time for serializing row batches.