You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/12/17 17:00:45 UTC

[impala] 03/04: IMPALA-4192: Move static state from ExecNode into a PlanNode

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f1c9f4c093bde9b6b8dc66c20021ab627c1842ae
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Nov 15 16:36:18 2019 -0800

    IMPALA-4192: Move static state from ExecNode into a PlanNode
    
    This patch adds a new class called PlanNode which contains a subset
    of the static state  of their corresponding ExecNode, of which there
    is one instance per fragment. ExecNode contains the runtime state
    and there can be up to MT_DOP instances of it per fragment.
    
    It also adds a similar class called AggregatorConfig which contains
    static state initialized from the thrift aggregator struct and is
    passed as an input to the Aggregator class's constructor.
    
    Eventually all static state including codegened function pointers
    would be moved to the PlanNodes.
    
    Testing:
    Ran exhaustive tests successfully.
    
    Change-Id: I69f1676bf67bac31fa5902511b3fcc269fd67472
    Reviewed-on: http://gerrit.cloudera.org:8080/14764
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc      |  63 ++++--
 be/src/exec/aggregation-node-base.h       |  14 +-
 be/src/exec/aggregation-node.cc           |   8 +-
 be/src/exec/aggregation-node.h            |   3 +-
 be/src/exec/aggregator.cc                 |  60 +++---
 be/src/exec/aggregator.h                  |  46 ++++-
 be/src/exec/analytic-eval-node.cc         | 138 +++++++------
 be/src/exec/analytic-eval-node.h          |  26 ++-
 be/src/exec/blocking-join-node.cc         |  34 ++--
 be/src/exec/blocking-join-node.h          |  15 +-
 be/src/exec/cardinality-check-node.cc     |  13 +-
 be/src/exec/cardinality-check-node.h      |   9 +-
 be/src/exec/data-source-scan-node.cc      |   6 +-
 be/src/exec/data-source-scan-node.h       |   3 +-
 be/src/exec/empty-set-node.cc             |  11 +-
 be/src/exec/empty-set-node.h              |  10 +-
 be/src/exec/exchange-node.cc              |  50 +++--
 be/src/exec/exchange-node.h               |  18 +-
 be/src/exec/exec-node.cc                  | 317 +++++++++++++++---------------
 be/src/exec/exec-node.h                   | 118 ++++++++---
 be/src/exec/grouping-aggregator.cc        |  57 +++---
 be/src/exec/grouping-aggregator.h         |  45 ++++-
 be/src/exec/hbase-scan-node.cc            |  14 +-
 be/src/exec/hbase-scan-node.h             |   2 +-
 be/src/exec/hdfs-scan-node-base.cc        |  80 +++++---
 be/src/exec/hdfs-scan-node-base.h         |  19 +-
 be/src/exec/hdfs-scan-node-mt.cc          |   4 +-
 be/src/exec/hdfs-scan-node-mt.h           |   3 +-
 be/src/exec/hdfs-scan-node.cc             |   4 +-
 be/src/exec/hdfs-scan-node.h              |   3 +-
 be/src/exec/kudu-scan-node-base.cc        |  10 +-
 be/src/exec/kudu-scan-node-base.h         |   3 +-
 be/src/exec/kudu-scan-node-mt.cc          |   4 +-
 be/src/exec/kudu-scan-node-mt.h           |   2 +-
 be/src/exec/kudu-scan-node.cc             |   4 +-
 be/src/exec/kudu-scan-node.h              |   2 +-
 be/src/exec/nested-loop-join-node.cc      |  44 +++--
 be/src/exec/nested-loop-join-node.h       |  17 +-
 be/src/exec/non-grouping-aggregator.cc    |   8 +-
 be/src/exec/non-grouping-aggregator.h     |   5 +-
 be/src/exec/partial-sort-node.cc          |  46 +++--
 be/src/exec/partial-sort-node.h           |  23 ++-
 be/src/exec/partitioned-hash-join-node.cc |  73 ++++---
 be/src/exec/partitioned-hash-join-node.h  |  22 ++-
 be/src/exec/scan-node.cc                  |  71 +++++--
 be/src/exec/scan-node.h                   |  16 +-
 be/src/exec/select-node.cc                |  19 +-
 be/src/exec/select-node.h                 |   9 +-
 be/src/exec/singular-row-src-node.cc      |  11 +-
 be/src/exec/singular-row-src-node.h       |  11 +-
 be/src/exec/sort-node.cc                  |  43 ++--
 be/src/exec/sort-node.h                   |  22 ++-
 be/src/exec/streaming-aggregation-node.cc |  10 +-
 be/src/exec/streaming-aggregation-node.h  |   2 +-
 be/src/exec/subplan-node.cc               |  59 ++++--
 be/src/exec/subplan-node.h                |  27 ++-
 be/src/exec/topn-node.cc                  |  47 +++--
 be/src/exec/topn-node.h                   |  21 +-
 be/src/exec/union-node.cc                 |  51 +++--
 be/src/exec/union-node.h                  |  19 +-
 be/src/exec/unnest-node.cc                |  66 ++++---
 be/src/exec/unnest-node.h                 |  31 ++-
 be/src/runtime/fragment-instance-state.cc |   7 +-
 be/src/runtime/fragment-instance-state.h  |   4 +-
 64 files changed, 1320 insertions(+), 682 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index cd6cfe3..c2da44f 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -17,8 +17,10 @@
 
 #include "exec/aggregation-node-base.h"
 
+#include "exec/aggregation-node.h"
 #include "exec/grouping-aggregator.h"
 #include "exec/non-grouping-aggregator.h"
+#include "exec/streaming-aggregation-node.h"
 #include "runtime/runtime-state.h"
 #include "util/runtime-profile-counters.h"
 
@@ -26,33 +28,56 @@
 
 namespace impala {
 
-AggregationNodeBase::AggregationNodeBase(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs), replicate_input_(tnode.agg_node.replicate_input) {}
+Status AggregationPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  int num_ags = tnode_->agg_node.aggregators.size();
+  for (int i = 0; i < num_ags; ++i) {
+    const TAggregator& agg = tnode_->agg_node.aggregators[i];
+    AggregatorConfig* node = nullptr;
+    if (agg.grouping_exprs.empty()) {
+      node = state->obj_pool()->Add(new AggregatorConfig(agg, state, this));
+    } else {
+      node = state->obj_pool()->Add(new GroupingAggregatorConfig(agg, state, this));
+    }
+    DCHECK(node != nullptr);
+    aggs_.push_back(node);
+    RETURN_IF_ERROR(aggs_[i]->Init(agg, state, this));
+  }
+  DCHECK(aggs_.size() > 0);
+  return Status::OK();
+}
 
-Status AggregationNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
-  // The conjuncts will be evaluated in the Aggregator, so don't pass them to the
-  // ExecNode. TODO: remove this once we assign conjuncts directly to Aggregators.
-  TPlanNode tnode_no_conjuncts(tnode);
-  tnode_no_conjuncts.__set_conjuncts(std::vector<TExpr>());
-  RETURN_IF_ERROR(ExecNode::Init(tnode_no_conjuncts, state));
+Status AggregationPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  if (tnode_->agg_node.aggregators[0].use_streaming_preaggregation) {
+    *node = pool->Add(new StreamingAggregationNode(pool, *this, state->desc_tbl()));
+  } else {
+    *node = pool->Add(new AggregationNode(pool, *this, state->desc_tbl()));
+  }
+  return Status::OK();
+}
 
-  int num_ags = tnode.agg_node.aggregators.size();
-  for (int i = 0; i < num_ags; ++i) {
-    const TAggregator& agg = tnode.agg_node.aggregators[i];
+AggregationNodeBase::AggregationNodeBase(
+    ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    replicate_input_(pnode.tnode_->agg_node.replicate_input) {
+  // Create the Aggregator nodes from their configs.
+  int num_aggs = pnode.aggs_.size();
+  for (int i = 0; i < num_aggs; ++i) {
+    const AggregatorConfig* agg = pnode.aggs_[i];
     unique_ptr<Aggregator> node;
-    if (agg.grouping_exprs.empty()) {
-      node.reset(new NonGroupingAggregator(this, pool_, agg, state->desc_tbl(), i));
+    if (agg->grouping_exprs_.empty()) {
+      node.reset(new NonGroupingAggregator(this, pool_, *agg, i));
     } else {
-      node.reset(new GroupingAggregator(this, pool_, agg, state->desc_tbl(),
-          tnode.agg_node.estimated_input_cardinality, i));
+      const GroupingAggregatorConfig* grouping_config =
+          static_cast<const GroupingAggregatorConfig*>(agg);
+      DCHECK(grouping_config != nullptr);
+      node.reset(new GroupingAggregator(this, pool_, *grouping_config,
+          pnode.tnode_->agg_node.estimated_input_cardinality, i));
     }
     aggs_.push_back(std::move(node));
-    RETURN_IF_ERROR(aggs_[i]->Init(agg, state, tnode.conjuncts));
     runtime_profile_->AddChild(aggs_[i]->runtime_profile());
   }
-  DCHECK(aggs_.size() > 0);
-  return Status::OK();
 }
 
 Status AggregationNodeBase::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index 89508e0..c5b480a 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -25,14 +25,24 @@
 
 namespace impala {
 
+class AggregationPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~AggregationPlanNode() {}
+  /// Configuration for generating aggregators that will be eventually used to aggregate
+  /// input rows by the exec node.
+  std::vector<AggregatorConfig*> aggs_;
+};
+
 /// Base class containing common code for the ExecNodes that do aggregation,
 /// AggregationNode and StreamingAggregationNode.
+
 class AggregationNodeBase : public ExecNode {
  public:
   AggregationNodeBase(
-      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+      ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 9ce87d1..8db7925 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -34,10 +34,10 @@
 namespace impala {
 
 AggregationNode::AggregationNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : AggregationNodeBase(pool, tnode, descs) {
-  for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) {
-    DCHECK(!tnode.agg_node.aggregators[i].use_streaming_preaggregation);
+    ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs)
+  : AggregationNodeBase(pool, pnode, descs) {
+  for (auto& t_agg : pnode.tnode_->agg_node.aggregators) {
+    DCHECK(!t_agg.use_streaming_preaggregation);
   }
 }
 
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
index dfec577..91f29b0 100644
--- a/be/src/exec/aggregation-node.h
+++ b/be/src/exec/aggregation-node.h
@@ -32,7 +32,8 @@ class RuntimeState;
 /// Aggregator, which does the actual work of aggregating.
 class AggregationNode : public AggregationNodeBase {
  public:
-  AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  AggregationNode(
+      ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
diff --git a/be/src/exec/aggregator.cc b/be/src/exec/aggregator.cc
index 5dfed29..939888f 100644
--- a/be/src/exec/aggregator.cc
+++ b/be/src/exec/aggregator.cc
@@ -41,46 +41,58 @@
 
 namespace impala {
 
-const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
-
-Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
-    const TAggregator& taggregator, const DescriptorTbl& descs, const std::string& name,
-    int agg_idx)
-  : id_(exec_node->id()),
-    exec_node_(exec_node),
-    agg_idx_(agg_idx),
-    pool_(pool),
-    intermediate_tuple_id_(taggregator.intermediate_tuple_id),
-    intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
+AggregatorConfig::AggregatorConfig(
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode)
+  : intermediate_tuple_id_(taggregator.intermediate_tuple_id),
+    intermediate_tuple_desc_(
+        state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_)),
     output_tuple_id_(taggregator.output_tuple_id),
-    output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
-    row_desc_(*exec_node->row_desc()),
-    input_row_desc_(*exec_node->child(0)->row_desc()),
-    needs_finalize_(taggregator.need_finalize),
-    runtime_profile_(RuntimeProfile::Create(pool_, name)) {}
-
-Aggregator::~Aggregator() {}
+    output_tuple_desc_(state->desc_tbl().GetTupleDescriptor(output_tuple_id_)),
+    row_desc_(*pnode->row_descriptor_),
+    input_row_desc_(*pnode->children_[0]->row_descriptor_),
+    needs_finalize_(taggregator.need_finalize) {}
 
-Status Aggregator::Init(const TAggregator& taggregator, RuntimeState* state,
-    const std::vector<TExpr>& conjuncts) {
+Status AggregatorConfig::Init(
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
   DCHECK(intermediate_tuple_desc_ != nullptr);
   DCHECK(output_tuple_desc_ != nullptr);
   DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
-
-  int j = GetNumGroupingExprs();
+  int j = taggregator.grouping_exprs.size();
   for (int i = 0; i < taggregator.aggregate_functions.size(); ++i, ++j) {
     SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
     SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
     AggFn* agg_fn;
     RETURN_IF_ERROR(AggFn::Create(taggregator.aggregate_functions[i], input_row_desc_,
         *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
-    agg_fns_.push_back(agg_fn);
+    aggregate_functions_.push_back(agg_fn);
   }
 
-  RETURN_IF_ERROR(ScalarExpr::Create(conjuncts, row_desc_, state, &conjuncts_));
+  RETURN_IF_ERROR(
+      ScalarExpr::Create(pnode->tnode_->conjuncts, row_desc_, state, &conjuncts_));
   return Status::OK();
 }
 
+const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
+
+Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
+    const AggregatorConfig& config, const std::string& name, int agg_idx)
+  : id_(exec_node->id()),
+    exec_node_(exec_node),
+    agg_idx_(agg_idx),
+    pool_(pool),
+    intermediate_tuple_id_(config.intermediate_tuple_id_),
+    intermediate_tuple_desc_(config.intermediate_tuple_desc_),
+    output_tuple_id_(config.output_tuple_id_),
+    output_tuple_desc_(config.output_tuple_desc_),
+    row_desc_(config.row_desc_),
+    input_row_desc_(config.input_row_desc_),
+    needs_finalize_(config.needs_finalize_),
+    agg_fns_(config.aggregate_functions_),
+    conjuncts_(config.conjuncts_),
+    runtime_profile_(RuntimeProfile::Create(pool_, name)) {}
+
+Aggregator::~Aggregator() {}
+
 Status Aggregator::Prepare(RuntimeState* state) {
   mem_tracker_.reset(new MemTracker(
       runtime_profile_, -1, runtime_profile_->name(), exec_node_->mem_tracker()));
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index 7cb47c3..e2fd326 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -34,6 +34,7 @@ namespace impala {
 
 class AggFn;
 class AggFnEvaluator;
+class PlanNode;
 class CodegenAnyVal;
 class DescriptorTbl;
 class ExecNode;
@@ -53,6 +54,45 @@ class Tuple;
 class TupleDescriptor;
 class TupleRow;
 
+/// AggregatorConfig contains the static state initialized from its corresponding thrift
+/// structure. It serves as an input for creating instances of the Aggregator class.
+class AggregatorConfig {
+ public:
+  AggregatorConfig(
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+  virtual Status Init(
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+  virtual ~AggregatorConfig() {}
+
+  /// Tuple into which Update()/Merge()/Serialize() results are stored.
+  TupleId intermediate_tuple_id_;
+  TupleDescriptor* intermediate_tuple_desc_;
+
+  /// Tuple into which Finalize() results are stored. Possibly the same as
+  /// the intermediate tuple.
+  TupleId output_tuple_id_;
+  TupleDescriptor* output_tuple_desc_;
+
+  /// The RowDescriptor for the exec node this aggregator corresponds to.
+  const RowDescriptor& row_desc_;
+  /// The RowDescriptor for the child of the exec node this aggregator corresponds to.
+  const RowDescriptor& input_row_desc_;
+
+  /// Certain aggregates require a finalize step, which is the final step of the
+  /// aggregate after consuming all input rows. The finalize step converts the aggregate
+  /// value into its final form. This is true if this aggregator contains aggregate that
+  /// requires a finalize step.
+  const bool needs_finalize_;
+
+  std::vector<ScalarExpr*> conjuncts_;
+
+  /// Exprs used to evaluate input rows
+  std::vector<ScalarExpr*> grouping_exprs_;
+
+  /// The list of all aggregate operations for this aggregator.
+  std::vector<AggFn*> aggregate_functions_;
+};
+
 /// Base class for aggregating rows. Used in the AggregationNode and
 /// StreamingAggregationNode.
 ///
@@ -61,14 +101,12 @@ class TupleRow;
 class Aggregator {
  public:
   /// 'agg_idx' is the index of 'taggregator' in the parent TAggregationNode.
-  Aggregator(ExecNode* exec_node, ObjectPool* pool, const TAggregator& taggregator,
-      const DescriptorTbl& descs, const std::string& name, int agg_idx);
+  Aggregator(ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config,
+      const std::string& name, int agg_idx);
   virtual ~Aggregator();
 
   /// Aggregators follow the same lifecycle as ExecNodes, except that after Open() and
   /// before GetNext() rows should be added with AddBatch(), followed by InputDone()[
-  virtual Status Init(const TAggregator& taggregator, RuntimeState* state,
-      const std::vector<TExpr>& conjuncts) WARN_UNUSED_RESULT;
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
   virtual void Codegen(RuntimeState* state) = 0;
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index b62f663..f8c3e6a 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -42,29 +42,85 @@ using namespace strings;
 
 namespace impala {
 
-AnalyticEvalNode::AnalyticEvalNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    window_(tnode.analytic_node.window),
+Status AnalyticEvalPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+
+  const TAnalyticNode& analytic_node = tnode.analytic_node;
+  TupleDescriptor* intermediate_tuple_desc =
+      state->desc_tbl().GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id);
+  TupleDescriptor* result_tuple_desc =
+      state->desc_tbl().GetTupleDescriptor(tnode.analytic_node.output_tuple_id);
+  bool has_lead_fn = false;
+
+  for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
+    AggFn* analytic_fn;
+    RETURN_IF_ERROR(AggFn::Create(analytic_node.analytic_functions[i],
+        *children_[0]->row_descriptor_, *(intermediate_tuple_desc->slots()[i]),
+        *(result_tuple_desc->slots()[i]), state, &analytic_fn));
+    analytic_fns_.push_back(analytic_fn);
+    DCHECK(!analytic_fn->is_merge());
+    const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn;
+    const bool is_lead_fn = fn.name.function_name == "lead";
+    is_lead_fn_.push_back(is_lead_fn);
+    has_lead_fn |= is_lead_fn;
+  }
+  const TAnalyticWindow& window = tnode.analytic_node.window;
+  DCHECK(!has_lead_fn || !window.__isset.window_start);
+  DCHECK(window.__isset.window_end || !window.__isset.window_start)
+      << "UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING.";
+
+  if (analytic_node.__isset.partition_by_eq || analytic_node.__isset.order_by_eq) {
+    DCHECK(analytic_node.__isset.buffered_tuple_id);
+    TupleDescriptor* buffered_tuple_desc =
+        state->desc_tbl().GetTupleDescriptor(tnode.analytic_node.buffered_tuple_id);
+    DCHECK(buffered_tuple_desc != nullptr);
+    vector<TTupleId> tuple_ids;
+    tuple_ids.push_back(children_[0]->row_descriptor_->tuple_descriptors()[0]->id());
+    tuple_ids.push_back(buffered_tuple_desc->id());
+    RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
+
+    if (analytic_node.__isset.partition_by_eq) {
+      RETURN_IF_ERROR(ScalarExpr::Create(
+          analytic_node.partition_by_eq, cmp_row_desc, state, &partition_by_eq_expr_));
+    }
+    if (analytic_node.__isset.order_by_eq) {
+      RETURN_IF_ERROR(ScalarExpr::Create(
+          analytic_node.order_by_eq, cmp_row_desc, state, &order_by_eq_expr_));
+    }
+  }
+  return Status::OK();
+}
+
+Status AnalyticEvalPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(
+      new AnalyticEvalNode(pool, *this, this->tnode_->analytic_node, state->desc_tbl()));
+  return Status::OK();
+}
+
+AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const AnalyticEvalPlanNode& pnode,
+      const TAnalyticNode& analytic_node, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    window_(analytic_node.window),
     intermediate_tuple_desc_(
-        descs.GetTupleDescriptor(tnode.analytic_node.intermediate_tuple_id)),
+        descs.GetTupleDescriptor(analytic_node.intermediate_tuple_id)),
     result_tuple_desc_(
-        descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)) {
-  if (tnode.analytic_node.__isset.buffered_tuple_id) {
-    buffered_tuple_desc_ = descs.GetTupleDescriptor(
-        tnode.analytic_node.buffered_tuple_id);
+        descs.GetTupleDescriptor(analytic_node.output_tuple_id)) {
+  if (analytic_node.__isset.buffered_tuple_id) {
+    buffered_tuple_desc_ =
+        descs.GetTupleDescriptor(analytic_node.buffered_tuple_id);
   }
-  if (!tnode.analytic_node.__isset.window) {
+  if (!analytic_node.__isset.window) {
     fn_scope_ = AnalyticEvalNode::PARTITION;
-  } else if (tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
+  } else if (analytic_node.window.type == TAnalyticWindowType::RANGE) {
     fn_scope_ = AnalyticEvalNode::RANGE;
     DCHECK(!window_.__isset.window_start)
-      << "RANGE windows must have UNBOUNDED PRECEDING";
-    DCHECK(!window_.__isset.window_end ||
-        window_.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW)
-      << "RANGE window end bound must be CURRENT ROW or UNBOUNDED FOLLOWING";
+        << "RANGE windows must have UNBOUNDED PRECEDING";
+    DCHECK(!window_.__isset.window_end
+        || window_.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW)
+        << "RANGE window end bound must be CURRENT ROW or UNBOUNDED FOLLOWING";
   } else {
-    DCHECK_EQ(tnode.analytic_node.window.type, TAnalyticWindowType::ROWS);
+    DCHECK_EQ(analytic_node.window.type, TAnalyticWindowType::ROWS);
     fn_scope_ = AnalyticEvalNode::ROWS;
     if (window_.__isset.window_start) {
       TAnalyticWindowBoundary b = window_.window_start;
@@ -88,6 +144,11 @@ AnalyticEvalNode::AnalyticEvalNode(
     }
   }
   VLOG_FILE << id() << " Window=" << DebugWindowString();
+  // Set the Exprs from the PlanNode
+  partition_by_eq_expr_ = pnode.partition_by_eq_expr_;
+  order_by_eq_expr_ = pnode.order_by_eq_expr_;
+  analytic_fns_ = pnode.analytic_fns_;
+  is_lead_fn_ = pnode.is_lead_fn_;
 }
 
 AnalyticEvalNode::~AnalyticEvalNode() {
@@ -95,56 +156,13 @@ AnalyticEvalNode::~AnalyticEvalNode() {
   DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
 }
 
-Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  DCHECK_EQ(conjunct_evals_.size(), 0);
-  state_ = state;
-  const TAnalyticNode& analytic_node = tnode.analytic_node;
-  bool has_lead_fn = false;
-
-  for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
-    AggFn* analytic_fn;
-    RETURN_IF_ERROR(AggFn::Create(analytic_node.analytic_functions[i],
-        *child(0)->row_desc(), *(intermediate_tuple_desc_->slots()[i]),
-        *(result_tuple_desc_->slots()[i]), state, &analytic_fn));
-    analytic_fns_.push_back(analytic_fn);
-    DCHECK(!analytic_fn->is_merge());
-    const TFunction& fn = analytic_node.analytic_functions[i].nodes[0].fn;
-    const bool is_lead_fn = fn.name.function_name == "lead";
-    is_lead_fn_.push_back(is_lead_fn);
-    has_lead_fn |= is_lead_fn;
-  }
-  DCHECK(!has_lead_fn || !window_.__isset.window_start);
-  DCHECK(fn_scope_ != PARTITION || analytic_node.order_by_exprs.empty());
-  DCHECK(window_.__isset.window_end || !window_.__isset.window_start)
-      << "UNBOUNDED FOLLOWING is only supported with UNBOUNDED PRECEDING.";
-
-  if (analytic_node.__isset.partition_by_eq || analytic_node.__isset.order_by_eq) {
-    DCHECK(analytic_node.__isset.buffered_tuple_id);
-    DCHECK(buffered_tuple_desc_ != nullptr);
-    vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(child(0)->row_desc()->tuple_descriptors()[0]->id());
-    tuple_ids.push_back(buffered_tuple_desc_->id());
-    RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
-
-    if (analytic_node.__isset.partition_by_eq) {
-      RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.partition_by_eq, cmp_row_desc,
-          state, &partition_by_eq_expr_));
-    }
-    if (analytic_node.__isset.order_by_eq) {
-      RETURN_IF_ERROR(ScalarExpr::Create(analytic_node.order_by_eq, cmp_row_desc,
-          state, &order_by_eq_expr_));
-    }
-  }
-  return Status::OK();
-}
-
 Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   DCHECK(child(0)->row_desc()->IsPrefixOf(*row_desc()));
   DCHECK_GE(resource_profile_.min_reservation,
       resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS);
+  state_ = state;
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_input_tuple_pool_.reset(new MemPool(mem_tracker()));
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 3d295aa..0589cb4 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -32,6 +32,27 @@ class AggFnEvaluator;
 class ScalarExpr;
 class ScalarExprEvaluator;
 
+class AnalyticEvalPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~AnalyticEvalPlanNode(){}
+
+  /// Analytic functions which live in the runtime-state's objpool.
+  std::vector<AggFn*> analytic_fns_;
+
+  /// Indicates if each evaluator is the lead() fn. Used by ResetLeadFnSlots() to
+  /// determine which slots need to be reset.
+  std::vector<bool> is_lead_fn_;
+
+  /// A predicate that checks if child tuple '<' buffered tuple for partitioning exprs.
+  ScalarExpr* partition_by_eq_expr_ = nullptr;
+
+  /// A predicate that checks if child tuple '<' buffered tuple for order by exprs.
+  ScalarExpr* order_by_eq_expr_ = nullptr;
+};
+
 /// Evaluates analytic functions with a single pass over input rows. It is assumed
 /// that the input has already been sorted on all of the partition exprs and then the
 /// order by exprs. If there is no order by clause or partition clause, the input is
@@ -61,12 +82,13 @@ class ScalarExprEvaluator;
 /// multiple rows have the same values for the order by exprs. The number of buffered
 /// rows may be an entire partition or even the entire input. Therefore, the output
 /// rows are buffered and may spill to disk via the BufferedTupleStream.
+
 class AnalyticEvalNode : public ExecNode {
  public:
-  AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  AnalyticEvalNode(ObjectPool* pool, const AnalyticEvalPlanNode& pnode,
+      const TAnalyticNode& analytic_node, const DescriptorTbl& descs);
   virtual ~AnalyticEvalNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index a352c70..87cd7a0 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -40,9 +40,22 @@ using namespace impala;
 
 const char* BlockingJoinNode::LLVM_CLASS_NAME = "class.impala::BlockingJoinNode";
 
+Status BlockingJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  TJoinOp::type join_op;
+  if (tnode_->node_type == TPlanNodeType::HASH_JOIN_NODE) {
+    join_op = tnode.hash_join_node.join_op;
+  } else {
+    DCHECK(tnode_->node_type == TPlanNodeType::NESTED_LOOP_JOIN_NODE);
+    join_op = tnode.nested_loop_join_node.join_op;
+  }
+  DCHECK(!IsSemiJoin(join_op) || conjuncts_.size() == 0);
+  return Status::OK();
+}
+
 BlockingJoinNode::BlockingJoinNode(const string& node_name, const TJoinOp::type join_op,
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
+    ObjectPool* pool, const BlockingJoinPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
     node_name_(node_name),
     join_op_(join_op),
     eos_(false),
@@ -52,18 +65,6 @@ BlockingJoinNode::BlockingJoinNode(const string& node_name, const TJoinOp::type
     semi_join_staging_row_(NULL) {
 }
 
-Status BlockingJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  DCHECK(!IsSemiJoin(join_op_) || conjuncts_.size() == 0);
-  runtime_profile_->AddLocalTimeCounter(
-      bind<int64_t>(&BlockingJoinNode::LocalTimeCounterFn,
-      runtime_profile_->total_time_counter(),
-      child(0)->runtime_profile()->total_time_counter(),
-      child(1)->runtime_profile()->total_time_counter(),
-      &built_probe_overlap_stop_watch_));
-  return Status::OK();
-}
-
 BlockingJoinNode::~BlockingJoinNode() {
   // probe_batch_ must be cleaned up in Close() to ensure proper resource freeing.
   DCHECK(probe_batch_ == NULL);
@@ -73,6 +74,11 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
 
+  runtime_profile_->AddLocalTimeCounter(bind<int64_t>(
+      &BlockingJoinNode::LocalTimeCounterFn, runtime_profile_->total_time_counter(),
+      child(0)->runtime_profile()->total_time_counter(),
+      child(1)->runtime_profile()->total_time_counter(),
+      &built_probe_overlap_stop_watch_));
   build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
   probe_timer_ = ADD_TIMER(runtime_profile(), "ProbeTime");
   build_row_counter_ = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 015738d..e433833 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -33,22 +33,27 @@ namespace impala {
 class RowBatch;
 class TupleRow;
 
+class BlockingJoinPlanNode : public PlanNode {
+ public:
+  /// Subclasses should call BlockingJoinNode::Init() and then perform any other Init()
+  /// work, e.g. creating expr trees.
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override = 0;
+};
+
 /// Abstract base class for join nodes that block while consuming all rows from their
 /// right child in Open(). There is no implementation of Reset() because the Open()
 /// sufficiently covers setting members into a 'reset' state.
 /// TODO: Remove the restriction that the tuples in the join's output row have to
 /// correspond to the order of its child exec nodes. See the DCHECKs in Init().
+
 class BlockingJoinNode : public ExecNode {
  public:
   BlockingJoinNode(const std::string& node_name, const TJoinOp::type join_op,
-      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+      ObjectPool* pool, const BlockingJoinPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual ~BlockingJoinNode();
 
-  /// Subclasses should call BlockingJoinNode::Init() and then perform any other Init()
-  /// work, e.g. creating expr trees.
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-
   /// Subclasses should call BlockingJoinNode::Prepare() and then perform any other
   /// Prepare() work, e.g. codegen.
   virtual Status Prepare(RuntimeState* state);
diff --git a/be/src/exec/cardinality-check-node.cc b/be/src/exec/cardinality-check-node.cc
index 454db79..26bca0a 100644
--- a/be/src/exec/cardinality-check-node.cc
+++ b/be/src/exec/cardinality-check-node.cc
@@ -26,10 +26,17 @@
 
 namespace impala {
 
+Status CardinalityCheckPlanNode::CreateExecNode(
+    RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new CardinalityCheckNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
 CardinalityCheckNode::CardinalityCheckNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
-      display_statement_(tnode.cardinality_check_node.display_statement) {
+    ObjectPool* pool, const CardinalityCheckPlanNode& pnode, const DescriptorTbl& descs)
+    : ExecNode(pool, pnode, descs),
+      display_statement_(pnode.tnode_->cardinality_check_node.display_statement) {
 }
 
 Status CardinalityCheckNode::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/cardinality-check-node.h b/be/src/exec/cardinality-check-node.h
index d44efd1..591f3ee 100644
--- a/be/src/exec/cardinality-check-node.h
+++ b/be/src/exec/cardinality-check-node.h
@@ -24,6 +24,12 @@
 
 namespace impala {
 
+class CardinalityCheckPlanNode : public PlanNode {
+ public:
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~CardinalityCheckPlanNode(){}
+};
+
 /// Node that returns an error if its child produces more than a single row.
 /// If successful, this node returns a deep copy of its single input row.
 ///
@@ -32,9 +38,10 @@ namespace impala {
 /// might produce results and incorrectly return them to the client. If the child of this
 /// node produces more than one row it means the SQL query is semantically invalid, so no
 /// rows must be returned to the client.
+
 class CardinalityCheckNode : public ExecNode {
  public:
-  CardinalityCheckNode(ObjectPool* pool, const TPlanNode& tnode,
+  CardinalityCheckNode(ObjectPool* pool, const CardinalityCheckPlanNode& pnode,
       const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state) override;
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 2273a50..74f26c8 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -69,10 +69,10 @@ const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to allo
 // Size of an encoded TIMESTAMP
 const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t);
 
-DataSourceScanNode::DataSourceScanNode(ObjectPool* pool, const TPlanNode& tnode,
+DataSourceScanNode::DataSourceScanNode(ObjectPool* pool, const ScanPlanNode& pnode,
     const DescriptorTbl& descs)
-    : ScanNode(pool, tnode, descs),
-      data_src_node_(tnode.data_source_node),
+    : ScanNode(pool, pnode, descs),
+      data_src_node_(pnode.tnode_->data_source_node),
       tuple_idx_(0),
       num_rows_(0),
       next_row_idx_(0) {
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index e8450fe..30a92a5 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -41,7 +41,8 @@ class Tuple;
 /// closed in Close().
 class DataSourceScanNode : public ScanNode {
  public:
-  DataSourceScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  DataSourceScanNode(
+      ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
 
   ~DataSourceScanNode();
 
diff --git a/be/src/exec/empty-set-node.cc b/be/src/exec/empty-set-node.cc
index 95d8ab6..4aaac1f 100644
--- a/be/src/exec/empty-set-node.cc
+++ b/be/src/exec/empty-set-node.cc
@@ -18,14 +18,21 @@
 #include "exec/empty-set-node.h"
 
 #include "exec/exec-node-util.h"
+#include "runtime/runtime-state.h"
 
 #include "common/names.h"
 
 namespace impala {
 
+Status EmptySetPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new EmptySetNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
 EmptySetNode::EmptySetNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs) {}
+    ObjectPool* pool, const EmptySetPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs) {}
 
 Status EmptySetNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
diff --git a/be/src/exec/empty-set-node.h b/be/src/exec/empty-set-node.h
index c59f3ae..bd5c0cd 100644
--- a/be/src/exec/empty-set-node.h
+++ b/be/src/exec/empty-set-node.h
@@ -23,11 +23,19 @@
 
 namespace impala {
 
+class EmptySetPlanNode : public PlanNode {
+ public:
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~EmptySetPlanNode(){}
+};
+
 /// Node that returns an empty result set, i.e., just sets eos_ in GetNext().
 /// Corresponds to EmptySetNode.java in the FE.
+
 class EmptySetNode : public ExecNode {
  public:
-  EmptySetNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  EmptySetNode(
+      ObjectPool* pool, const EmptySetPlanNode& pnode, const DescriptorTbl& descs);
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
 };
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 3f96432..5a59398 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -42,32 +42,44 @@ using namespace impala;
 DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
     "(Advanced) Maximum size of per-query receive-side buffer");
 
+Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  if (!tnode.exchange_node.__isset.sort_info) return Status::OK();
+
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.exchange_node.sort_info.ordering_exprs,
+      *row_descriptor_, state, &ordering_exprs_));
+  is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order;
+  nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
+  return Status::OK();
+}
+
+Status ExchangePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new ExchangeNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
 ExchangeNode::ExchangeNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
+    ObjectPool* pool, const ExchangePlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
     num_senders_(0),
     stream_recvr_(),
-    input_row_desc_(descs, tnode.exchange_node.input_row_tuples,
-        vector<bool>(
-          tnode.nullable_tuples.begin(),
-          tnode.nullable_tuples.begin() + tnode.exchange_node.input_row_tuples.size())),
+    input_row_desc_(descs, pnode.tnode_->exchange_node.input_row_tuples,
+        vector<bool>(pnode.tnode_->nullable_tuples.begin(),
+                        pnode.tnode_->nullable_tuples.begin()
+                            + pnode.tnode_->exchange_node.input_row_tuples.size())),
     next_row_idx_(0),
-    is_merging_(tnode.exchange_node.__isset.sort_info),
-    offset_(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0),
+    is_merging_(pnode.tnode_->exchange_node.__isset.sort_info),
+    offset_(pnode.tnode_->exchange_node.__isset.offset ?
+            pnode.tnode_->exchange_node.offset :
+            0),
     num_rows_skipped_(0) {
   DCHECK_GE(offset_, 0);
   DCHECK(is_merging_ || (offset_ == 0));
-}
-
-Status ExchangeNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  if (!is_merging_) return Status::OK();
-
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.exchange_node.sort_info.ordering_exprs,
-      row_descriptor_, state, &ordering_exprs_));
-  is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order;
-  nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
-  return Status::OK();
+  if (!is_merging_) return;
+  ordering_exprs_ = pnode.ordering_exprs_;
+  is_asc_order_ = pnode.is_asc_order_;
+  nulls_first_ = pnode.nulls_first_;
 }
 
 Status ExchangeNode::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index cbd65a9..e0f459d 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -31,6 +31,19 @@ class RowBatch;
 class ScalarExpr;
 class TupleRowComparator;
 
+class ExchangePlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~ExchangePlanNode(){}
+
+  /// Sort expressions and parameters passed to the merging receiver.
+  std::vector<ScalarExpr*> ordering_exprs_;
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+};
+
 /// Receiver node for data streams. The data stream receiver is created in Prepare()
 /// and closed in Close().
 /// is_merging is set to indicate that rows from different senders must be merged
@@ -42,11 +55,12 @@ class TupleRowComparator;
 /// in its SortExecExprs member that are used to compare rows.
 /// If is_merging_ is false, the exchange node directly retrieves batches from the row
 /// batch queue of the DataStreamRecvrBase via calls to DataStreamRecvrBase::GetBatch().
+
 class ExchangeNode : public ExecNode {
  public:
-  ExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ExchangeNode(
+      ObjectPool* pool, const ExchangePlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual void Codegen(RuntimeState* state);
   /// Blocks until the first batch is available for consumption via GetNext().
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index e4a851f..16f1a50 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -71,22 +71,158 @@
 using strings::Substitute;
 
 namespace impala {
+Status PlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  tnode_ = &tnode;
+  row_descriptor_ = state->obj_pool()->Add(
+      new RowDescriptor(state->desc_tbl(), tnode_->row_tuples, tnode_->nullable_tuples));
+
+  if (tnode_->node_type != TPlanNodeType::AGGREGATION_NODE) {
+    // In Agg node the conjuncts are executed by the Aggregators.
+    RETURN_IF_ERROR(
+        ScalarExpr::Create(tnode_->conjuncts, *row_descriptor_, state, &conjuncts_));
+  }
+  return Status::OK();
+}
+
+Status PlanNode::CreateTree(
+      RuntimeState* state, const TPlan& plan, PlanNode** root) {
+  if (plan.nodes.size() == 0) {
+    *root = NULL;
+    return Status::OK();
+  }
+  int node_idx = 0;
+  Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root);
+  if (status.ok() && node_idx + 1 != plan.nodes.size()) {
+    status = Status(
+        "Plan tree only partially reconstructed. Not all thrift nodes were used.");
+  }
+  if (!status.ok()) {
+    LOG(ERROR) << "Could not construct plan tree:\n"
+               << apache::thrift::ThriftDebugString(plan);
+  }
+  return status;
+}
+
+Status PlanNode::CreateTreeHelper(RuntimeState* state,
+      const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
+      PlanNode** root) {
+  // propagate error case
+  if (*node_idx >= tnodes.size()) {
+    return Status("Failed to reconstruct plan tree from thrift.");
+  }
+  const TPlanNode& tnode = tnodes[*node_idx];
+
+  int num_children = tnode.num_children;
+  PlanNode* node = NULL;
+  RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node, state));
+  if (parent != NULL) {
+    parent->children_.push_back(node);
+  } else {
+    *root = node;
+  }
+  for (int i = 0; i < num_children; ++i) {
+    ++*node_idx;
+    RETURN_IF_ERROR(CreateTreeHelper(state, tnodes, node, node_idx, NULL));
+    // we are expecting a child, but have used all nodes
+    // this means we have been given a bad tree and must fail
+    if (*node_idx >= tnodes.size()) {
+      return Status("Failed to reconstruct plan tree from thrift.");
+    }
+  }
+
+  // Call Init() after children have been set and Init()'d themselves
+  RETURN_IF_ERROR(node->Init(tnode, state));
+  return Status::OK();
+}
+
+Status PlanNode::CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
+      RuntimeState* state) {
+  switch (tnode.node_type) {
+    case TPlanNodeType::HDFS_SCAN_NODE:
+      *node = pool->Add(new HdfsScanPlanNode());
+      break;
+    case TPlanNodeType::HBASE_SCAN_NODE:
+    case TPlanNodeType::DATA_SOURCE_NODE:
+    case TPlanNodeType::KUDU_SCAN_NODE:
+      *node = pool->Add(new ScanPlanNode());
+      break;
+    case TPlanNodeType::AGGREGATION_NODE:
+      *node = pool->Add(new AggregationPlanNode());
+      break;
+    case TPlanNodeType::HASH_JOIN_NODE:
+      *node = pool->Add(new PartitionedHashJoinPlanNode());
+      break;
+    case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
+      *node = pool->Add(new NestedLoopJoinPlanNode());
+      break;
+    case TPlanNodeType::EMPTY_SET_NODE:
+      *node = pool->Add(new EmptySetPlanNode());
+      break;
+    case TPlanNodeType::EXCHANGE_NODE:
+      *node = pool->Add(new ExchangePlanNode());
+      break;
+    case TPlanNodeType::SELECT_NODE:
+      *node = pool->Add(new SelectPlanNode());
+      break;
+    case TPlanNodeType::SORT_NODE:
+      if (tnode.sort_node.type == TSortType::PARTIAL) {
+        *node = pool->Add(new PartialSortPlanNode());
+      } else if (tnode.sort_node.type == TSortType::TOPN) {
+        *node = pool->Add(new TopNPlanNode());
+      } else {
+        DCHECK(tnode.sort_node.type == TSortType::TOTAL);
+        *node = pool->Add(new SortPlanNode());
+      }
+      break;
+    case TPlanNodeType::UNION_NODE:
+      *node = pool->Add(new UnionPlanNode());
+      break;
+    case TPlanNodeType::ANALYTIC_EVAL_NODE:
+      *node = pool->Add(new AnalyticEvalPlanNode());
+      break;
+    case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
+      *node = pool->Add(new SingularRowSrcPlanNode());
+      break;
+    case TPlanNodeType::SUBPLAN_NODE:
+      *node = pool->Add(new SubplanPlanNode());
+      break;
+    case TPlanNodeType::UNNEST_NODE:
+      *node = pool->Add(new UnnestPlanNode());
+      break;
+    case TPlanNodeType::CARDINALITY_CHECK_NODE:
+      *node = pool->Add(new CardinalityCheckPlanNode());
+      break;
+    default:
+      map<int, const char*>::const_iterator i =
+          _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
+      const char* str = "unknown node type";
+      if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
+        str = i->second;
+      }
+      stringstream error_msg;
+      error_msg << str << " not implemented";
+      return Status(error_msg.str());
+  }
+  return Status::OK();
+}
 
 const string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
 
-ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : id_(tnode.node_id),
-    type_(tnode.node_type),
+ExecNode::ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl& descs)
+  : plan_node_(pnode),
+    id_(pnode.tnode_->node_id),
+    type_(pnode.tnode_->node_type),
     pool_(pool),
-    row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples),
-    resource_profile_(tnode.resource_profile),
-    limit_(tnode.limit),
+    conjuncts_(pnode.conjuncts_),
+    row_descriptor_(*(pnode.row_descriptor_)),
+    resource_profile_(pnode.tnode_->resource_profile),
+    limit_(pnode.tnode_->limit),
     runtime_profile_(RuntimeProfile::Create(
-        pool_, Substitute("$0 (id=$1)", PrintThriftEnum(tnode.node_type), id_))),
-    rows_returned_counter_(NULL),
-    rows_returned_rate_(NULL),
-    containing_subplan_(NULL),
-    disable_codegen_(tnode.disable_codegen),
+        pool_, Substitute("$0 (id=$1)", PrintThriftEnum(type_), id_))),
+    rows_returned_counter_(nullptr),
+    rows_returned_rate_(nullptr),
+    containing_subplan_(nullptr),
+    disable_codegen_(pnode.tnode_->disable_codegen),
     num_rows_returned_(0),
     is_closed_(false) {
   runtime_profile_->SetPlanNodeId(id_);
@@ -96,12 +232,6 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
 ExecNode::~ExecNode() {
 }
 
-Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(
-      ScalarExpr::Create(tnode.conjuncts, row_descriptor_, state, &conjuncts_));
-  return Status::OK();
-}
-
 Status ExecNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state));
   DCHECK(runtime_profile_ != NULL);
@@ -184,151 +314,24 @@ void ExecNode::Close(RuntimeState* state) {
   if (events_ != nullptr) events_->MarkEvent("Closed");
 }
 
-Status ExecNode::CreateTree(
-    RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
-  if (plan.nodes.size() == 0) {
-    *root = NULL;
-    return Status::OK();
-  }
-  int node_idx = 0;
-  Status status = CreateTreeHelper(state, plan.nodes, descs, NULL, &node_idx, root);
-  if (status.ok() && node_idx + 1 != plan.nodes.size()) {
-    status = Status(
-        "Plan tree only partially reconstructed. Not all thrift nodes were used.");
-  }
-  if (!status.ok()) {
-    LOG(ERROR) << "Could not construct plan tree:\n"
-               << apache::thrift::ThriftDebugString(plan);
+Status ExecNode::CreateTree(RuntimeState* state, const PlanNode& plan_node,
+    const DescriptorTbl& descs, ExecNode** root) {
+  RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));
+  DCHECK(*root != nullptr);
+  for (auto& child : plan_node.children_) {
+    ExecNode* child_node;
+    RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
+    DCHECK(child_node != nullptr);
+    (*root)->children_.push_back(child_node);
   }
-  return status;
-}
-
-Status ExecNode::CreateTreeHelper(RuntimeState* state, const vector<TPlanNode>& tnodes,
-    const DescriptorTbl& descs, ExecNode* parent, int* node_idx, ExecNode** root) {
-  // propagate error case
-  if (*node_idx >= tnodes.size()) {
-    return Status("Failed to reconstruct plan tree from thrift.");
-  }
-  const TPlanNode& tnode = tnodes[*node_idx];
-
-  int num_children = tnode.num_children;
-  ExecNode* node = NULL;
-  RETURN_IF_ERROR(CreateNode(state->obj_pool(), tnode, descs, &node, state));
-  if (parent != NULL) {
-    parent->children_.push_back(node);
-  } else {
-    *root = node;
-  }
-  for (int i = 0; i < num_children; ++i) {
-    ++*node_idx;
-    RETURN_IF_ERROR(CreateTreeHelper(state, tnodes, descs, node, node_idx, NULL));
-    // we are expecting a child, but have used all nodes
-    // this means we have been given a bad tree and must fail
-    if (*node_idx >= tnodes.size()) {
-      return Status("Failed to reconstruct plan tree from thrift.");
-    }
-  }
-
-  // Call Init() after children have been set and Init()'d themselves
-  RETURN_IF_ERROR(node->Init(tnode, state));
 
   // build up tree of profiles; add children >0 first, so that when we print
   // the profile, child 0 is printed last (makes the output more readable)
-  for (int i = 1; i < node->children_.size(); ++i) {
-    node->runtime_profile()->AddChild(node->children_[i]->runtime_profile());
-  }
-  if (!node->children_.empty()) {
-    node->runtime_profile()->AddChild(node->children_[0]->runtime_profile(), false);
+  for (int i = 1; i < (*root)->children_.size(); ++i) {
+    (*root)->runtime_profile()->AddChild((*root)->children_[i]->runtime_profile());
   }
-
-  return Status::OK();
-}
-
-Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs, ExecNode** node, RuntimeState* state) {
-  stringstream error_msg;
-  switch (tnode.node_type) {
-    case TPlanNodeType::HDFS_SCAN_NODE:
-      *node = pool->Add(tnode.hdfs_scan_node.use_mt_scan_node ?
-              static_cast<HdfsScanNodeBase*>(new HdfsScanNodeMt(pool, tnode, descs)) :
-              static_cast<HdfsScanNodeBase*>(new HdfsScanNode(pool, tnode, descs)));
-      break;
-    case TPlanNodeType::HBASE_SCAN_NODE:
-      *node = pool->Add(new HBaseScanNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::DATA_SOURCE_NODE:
-      *node = pool->Add(new DataSourceScanNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::KUDU_SCAN_NODE:
-      RETURN_IF_ERROR(CheckKuduAvailability());
-      if (tnode.kudu_scan_node.use_mt_scan_node) {
-        DCHECK_GT(state->query_options().mt_dop, 0);
-        *node = pool->Add(new KuduScanNodeMt(pool, tnode, descs));
-      } else {
-        DCHECK(state->query_options().mt_dop == 0
-            || state->query_options().num_scanner_threads == 1);
-        *node = pool->Add(new KuduScanNode(pool, tnode, descs));
-      }
-      break;
-    case TPlanNodeType::AGGREGATION_NODE:
-      if (tnode.agg_node.aggregators[0].use_streaming_preaggregation) {
-        *node = pool->Add(new StreamingAggregationNode(pool, tnode, descs));
-      } else {
-        *node = pool->Add(new AggregationNode(pool, tnode, descs));
-      }
-      break;
-    case TPlanNodeType::HASH_JOIN_NODE:
-      *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
-      *node = pool->Add(new NestedLoopJoinNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::EMPTY_SET_NODE:
-      *node = pool->Add(new EmptySetNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::EXCHANGE_NODE:
-      *node = pool->Add(new ExchangeNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::SELECT_NODE:
-      *node = pool->Add(new SelectNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::SORT_NODE:
-      if (tnode.sort_node.type == TSortType::PARTIAL) {
-        *node = pool->Add(new PartialSortNode(pool, tnode, descs));
-      } else if (tnode.sort_node.type == TSortType::TOPN) {
-        *node = pool->Add(new TopNNode(pool, tnode, descs));
-      } else {
-        DCHECK(tnode.sort_node.type == TSortType::TOTAL);
-        *node = pool->Add(new SortNode(pool, tnode, descs));
-      }
-      break;
-    case TPlanNodeType::UNION_NODE:
-      *node = pool->Add(new UnionNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::ANALYTIC_EVAL_NODE:
-      *node = pool->Add(new AnalyticEvalNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
-      *node = pool->Add(new SingularRowSrcNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::SUBPLAN_NODE:
-      *node = pool->Add(new SubplanNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::UNNEST_NODE:
-      *node = pool->Add(new UnnestNode(pool, tnode, descs));
-      break;
-    case TPlanNodeType::CARDINALITY_CHECK_NODE:
-      *node = pool->Add(new CardinalityCheckNode(pool, tnode, descs));
-      break;
-    default:
-      map<int, const char*>::const_iterator i =
-          _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
-      const char* str = "unknown node type";
-      if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
-        str = i->second;
-      }
-      error_msg << str << " not implemented";
-      return Status(error_msg.str());
+  if (!(*root)->children_.empty()) {
+    (*root)->runtime_profile()->AddChild((*root)->children_[0]->runtime_profile(), false);
   }
   return Status::OK();
 }
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index a7c0004..2e7a1cb 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -44,30 +44,88 @@ class RowBatch;
 class RuntimeState;
 class ScalarExpr;
 class SubplanNode;
+class SubplanPlanNode;
 class TPlan;
 class TupleRow;
 class TDebugOptions;
-
-/// Superclass of all execution nodes.
+class ExecNode;
+
+/// PlanNode and ExecNode are the super-classes of all plan nodes and execution nodes
+/// respectively. The plan nodes contain a subset of the static state of their
+/// corresponding ExecNode, of which there is one instance per fragment. ExecNode contains
+/// only runtime state and there can be up to MT_DOP instances of it per fragment.
+/// Hence every ExecNode has a corresponding PlanNode which may or may not be at the same
+/// level of hierarchy as the ExecNode.
+/// TODO: IMPALA-9216: Move all the static state of ExecNode into PlanNode.
 ///
 /// All subclasses need to make sure to check RuntimeState::is_cancelled()
 /// periodically in order to ensure timely termination after the cancellation
 /// flag gets set.
-/// TODO: Move static state of ExecNode into PlanNode, of which there is one instance
-/// per fragment. ExecNode contains only runtime state and there can be up to MT_DOP
-/// instances of it per fragment.
+
+class PlanNode {
+ public:
+  PlanNode() = default;
+
+  /// Initializes this object from the thrift tnode desc. All internal members created and
+  /// initialized here will be owned by state->obj_pool().
+  /// If overridden in subclass, must first call superclass's Init().
+  /// Should only be called after all children have been set and Init()-ed.
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+
+  /// Create its corresponding exec node. Place exec node in state->obj_pool().
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const = 0;
+
+  /// Creates plan node tree from list of nodes contained in plan via depth-first
+  /// traversal. All nodes are placed in state->obj_pool() and have Init() called on them.
+  /// Returns error if 'plan' is corrupted, otherwise success.
+  static Status CreateTree(
+      RuntimeState* state, const TPlan& plan, PlanNode** root) WARN_UNUSED_RESULT;
+
+  virtual ~PlanNode(){}
+
+  /// TODO: IMPALA-9216: Add accessor methods for these members instead of making
+  /// them public.
+  /// Reference to the thrift node that represents this PlanNode.
+  const TPlanNode* tnode_ = nullptr;
+
+  /// Conjuncts in this node. 'conjuncts_' live in this exec node's object pool. Note:
+  /// conjunct_evals_ are not created for Aggregation nodes. TODO: move conjuncts to
+  /// query-state's obj pool.
+  std::vector<ScalarExpr*> conjuncts_;
+
+  RowDescriptor* row_descriptor_ = nullptr;
+
+  // Runtime filter's expressions assigned to this plan node.
+  std::vector<ScalarExpr*> runtime_filter_exprs_;
+
+  std::vector<PlanNode*> children_;
+
+  /// Pointer to the containing SubplanPlanNode or NULL if not inside a subplan.
+  /// Set by the containing SubplanPlanNode::Prepare() before Prepare() is called on
+  /// 'this' node. Not owned.
+  SubplanPlanNode* containing_subplan_ = nullptr;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(PlanNode);
+
+  /// Create a single exec node derived from thrift node; place exec node in 'pool'.
+  static Status CreatePlanNode(ObjectPool* pool, const TPlanNode& tnode, PlanNode** node,
+      RuntimeState* state) WARN_UNUSED_RESULT;
+
+  static Status CreateTreeHelper(RuntimeState* state,
+      const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx,
+      PlanNode** root) WARN_UNUSED_RESULT;
+};
+
 class ExecNode {
  public:
-  /// Init conjuncts.
-  ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  /// Copies over state from the 'pnode', initializes internal objects from 'pool' that
+  /// cannot fail during initialization and finally initializes references to descriptors
+  /// from 'descs'.
+  ExecNode(ObjectPool* pool, const PlanNode& pnode, const DescriptorTbl& descs);
 
   virtual ~ExecNode();
 
-  /// Initializes this object from the thrift tnode desc. The subclass should
-  /// do any initialization that can fail in Init() rather than the ctor.
-  /// If overridden in subclass, must first call superclass's Init().
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
-
   /// Sets up internal structures, etc., without doing any actual work.
   /// Must be called prior to Open(). Will only be called once in this
   /// node's lifetime.
@@ -142,9 +200,9 @@ class ExecNode {
   virtual void Close(RuntimeState* state);
 
   /// Creates exec node tree from list of nodes contained in plan via depth-first
-  /// traversal. All nodes are placed in state->obj_pool() and have Init() called on them.
-  /// Returns error if 'plan' is corrupted, otherwise success.
-  static Status CreateTree(RuntimeState* state, const TPlan& plan,
+  /// traversal. All nodes are placed in state->obj_pool().
+  /// Returns error if 'plan_node' is corrupted, otherwise success.
+  static Status CreateTree(RuntimeState* state, const PlanNode& plan_node,
       const DescriptorTbl& descs, ExecNode** root) WARN_UNUSED_RESULT;
 
   /// Set debug action in 'tree' according to debug_options.
@@ -203,9 +261,12 @@ class ExecNode {
 
   ExecNode* child(int i) { return children_[i]; }
   int num_children() const { return children_.size(); }
+
+  /// Valid to call in or after Prepare().
   SubplanNode* get_containing_subplan() const { return containing_subplan_; }
+
   void set_containing_subplan(SubplanNode* sp) {
-    DCHECK(containing_subplan_ == NULL);
+    DCHECK(containing_subplan_ == nullptr);
     containing_subplan_ = sp;
   }
 
@@ -261,8 +322,7 @@ class ExecNode {
   bool IsNodeCodegenDisabled() const;
 
   /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
-  /// Valid to call in or after Prepare().
-  bool IsInSubplan() const { return containing_subplan_ != NULL; }
+  bool IsInSubplan() const { return plan_node_.containing_subplan_ != nullptr; }
 
   /// Names of counters shared by all exec nodes
   static const std::string ROW_THROUGHPUT_COUNTER;
@@ -300,6 +360,11 @@ class ExecNode {
     return reservation_manager_.ReleaseUnusedReservation();
   }
 
+  /// Reference to the PlanNode shared across fragment instances.
+  /// TODO: Store a specialized reference directly in the child classes when all static
+  /// state is moved there and accessed directly.
+  const PlanNode& plan_node_;
+
   /// Unique within a single plan tree.
   int id_;
   TPlanNodeType::type type_;
@@ -317,12 +382,12 @@ class ExecNode {
 
   /// Conjuncts and their evaluators in this node. 'conjuncts_' live in the
   /// query-state's object pool while the evaluators live in this exec node's
-  /// object pool.
+  /// object pool. Note: conjunct_evals_ are not created for Aggregation nodes.
   std::vector<ScalarExpr*> conjuncts_;
   std::vector<ScalarExprEvaluator*> conjunct_evals_;
 
   std::vector<ExecNode*> children_;
-  RowDescriptor row_descriptor_;
+  RowDescriptor& row_descriptor_;
 
   /// Resource information sent from the frontend.
   const TBackendResourceProfile resource_profile_;
@@ -356,21 +421,12 @@ class ExecNode {
   boost::scoped_ptr<MemPool> expr_results_pool_;
 
   /// Pointer to the containing SubplanNode or NULL if not inside a subplan.
-  /// Set by SubplanNode::Init(). Not owned.
+  /// Set by SubplanNode::Prepare() before Prepare() is called on 'this' node. Not owned.
   SubplanNode* containing_subplan_;
 
   /// If true, codegen should be disabled for this exec node.
   const bool disable_codegen_;
 
-  /// Create a single exec node derived from thrift node; place exec node in 'pool'.
-  static Status CreateNode(ObjectPool* pool, const TPlanNode& tnode,
-      const DescriptorTbl& descs, ExecNode** node,
-      RuntimeState* state) WARN_UNUSED_RESULT;
-
-  static Status CreateTreeHelper(RuntimeState* state,
-      const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs, ExecNode* parent,
-      int* node_idx, ExecNode** root) WARN_UNUSED_RESULT;
-
   virtual bool IsScanNode() const { return false; }
 
   /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'.
@@ -429,6 +485,8 @@ class ExecNode {
   bool CheckLimitAndTruncateRowBatchIfNeededShared(RowBatch* row_batch, bool* eos);
 
  private:
+  DISALLOW_COPY_AND_ASSIGN(ExecNode);
+
   /// Keeps track of number of rows returned by an exec node. If this variable is shared
   /// by multiple threads, it should be accessed using thread-safe functions defined
   /// above. The single-threaded code-paths should use non-atomic functions defined
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index d94604c..e0d321e 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -82,26 +82,15 @@ static const StreamingHtMinReductionEntry STREAMING_HT_MIN_REDUCTION[] = {
     {2 * 1024 * 1024, 2.0},
 };
 
-static const int STREAMING_HT_MIN_REDUCTION_SIZE =
-    sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
-
-GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const TAggregator& taggregator, const DescriptorTbl& descs,
-    int64_t estimated_input_cardinality, int agg_idx)
-  : Aggregator(exec_node, pool, taggregator, descs,
-        Substitute("GroupingAggregator $0", agg_idx), agg_idx),
+GroupingAggregatorConfig::GroupingAggregatorConfig(
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode)
+  : AggregatorConfig(taggregator, state, pnode),
     intermediate_row_desc_(intermediate_tuple_desc_, false),
     is_streaming_preagg_(taggregator.use_streaming_preaggregation),
-    resource_profile_(taggregator.resource_profile),
-    is_in_subplan_(exec_node->IsInSubplan()),
-    limit_(exec_node->limit()),
-    estimated_input_cardinality_(estimated_input_cardinality),
-    partition_pool_(new ObjectPool()) {
-  DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
-}
+    resource_profile_(taggregator.resource_profile){};
 
-Status GroupingAggregator::Init(const TAggregator& taggregator, RuntimeState* state,
-    const std::vector<TExpr>& conjuncts) {
+Status GroupingAggregatorConfig::Init(
+    const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
   RETURN_IF_ERROR(ScalarExpr::Create(
       taggregator.grouping_exprs, input_row_desc_, state, &grouping_exprs_));
 
@@ -110,9 +99,9 @@ Status GroupingAggregator::Init(const TAggregator& taggregator, RuntimeState* st
     SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
     DCHECK(desc->type().type == TYPE_NULL || desc->type() == grouping_exprs_[i]->type());
     // Hack to avoid TYPE_NULL SlotRefs.
-    SlotRef* build_expr =
-        pool_->Add(desc->type().type != TYPE_NULL ? new SlotRef(desc) :
-                                                    new SlotRef(desc, TYPE_BOOLEAN));
+    SlotRef* build_expr = state->obj_pool()->Add(desc->type().type != TYPE_NULL ?
+            new SlotRef(desc) :
+            new SlotRef(desc, TYPE_BOOLEAN));
     build_exprs_.push_back(build_expr);
     // Not an entry point because all hash table callers support codegen.
     RETURN_IF_ERROR(
@@ -120,13 +109,35 @@ Status GroupingAggregator::Init(const TAggregator& taggregator, RuntimeState* st
     if (build_expr->type().IsVarLenStringType()) string_grouping_exprs_.push_back(i);
   }
 
-  RETURN_IF_ERROR(Aggregator::Init(taggregator, state, conjuncts));
-  for (int i = 0; i < agg_fns_.size(); ++i) {
-    needs_serialize_ |= agg_fns_[i]->SupportsSerialize();
+  RETURN_IF_ERROR(AggregatorConfig::Init(taggregator, state, pnode));
+  for (int i = 0; i < aggregate_functions_.size(); ++i) {
+    needs_serialize_ |= aggregate_functions_[i]->SupportsSerialize();
   }
   return Status::OK();
 }
 
+static const int STREAMING_HT_MIN_REDUCTION_SIZE =
+    sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
+
+GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
+    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
+    int agg_idx)
+  : Aggregator(
+        exec_node, pool, config, Substitute("GroupingAggregator $0", agg_idx), agg_idx),
+    intermediate_row_desc_(config.intermediate_row_desc_),
+    is_streaming_preagg_(config.is_streaming_preagg_),
+    needs_serialize_(config.needs_serialize_),
+    grouping_exprs_(config.grouping_exprs_),
+    build_exprs_(config.build_exprs_),
+    string_grouping_exprs_(config.string_grouping_exprs_),
+    resource_profile_(config.resource_profile_),
+    is_in_subplan_(exec_node->IsInSubplan()),
+    limit_(exec_node->limit()),
+    estimated_input_cardinality_(estimated_input_cardinality),
+    partition_pool_(new ObjectPool()) {
+  DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
+}
+
 Status GroupingAggregator::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Aggregator::Prepare(state));
   state_ = state;
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index bdcae14..5baac2e 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -33,6 +33,7 @@
 namespace impala {
 
 class AggFnEvaluator;
+class PlanNode;
 class LlvmCodeGen;
 class RowBatch;
 class RuntimeState;
@@ -112,14 +113,48 @@ class Tuple;
 /// There are so many contexts in use that a plain "ctx" variable should never be used.
 /// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
 /// TODO: support an Init() method with an initial value in the UDAF interface.
+
+class GroupingAggregatorConfig : public AggregatorConfig {
+ public:
+  GroupingAggregatorConfig(
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode);
+  virtual Status Init(
+      const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) override;
+  ~GroupingAggregatorConfig() {}
+
+  /// Row with the intermediate tuple as its only tuple.
+  /// Construct a new row desc for preparing the build exprs because neither the child's
+  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
+  /// in a single-node plan with an intermediate tuple different from the output tuple.
+  /// Lives in the query state's obj_pool.
+  RowDescriptor intermediate_row_desc_;
+
+  /// True if this is first phase of a two-phase distributed aggregation for which we
+  /// are doing a streaming preaggregation.
+  const bool is_streaming_preagg_;
+
+  /// Resource information sent from the frontend.
+  const TBackendResourceProfile resource_profile_;
+
+  /// True if any of the evaluators require the serialize step.
+  bool needs_serialize_ = false;
+
+  /// Exprs used to insert constructed aggregation tuple into the hash table.
+  /// All the exprs are simply SlotRefs for the intermediate tuple.
+  std::vector<ScalarExpr*> build_exprs_;
+
+  /// Indices of grouping exprs with var-len string types in grouping_exprs_.
+  /// We need to do more work for var-len expressions when allocating and spilling rows.
+  /// All var-len grouping exprs have type string.
+  std::vector<int> string_grouping_exprs_;
+};
+
 class GroupingAggregator : public Aggregator {
  public:
   GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const TAggregator& taggregator, const DescriptorTbl& descs,
-      int64_t estimated_input_cardinality, int agg_idx);
+      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
+      int agg_idx);
 
-  virtual Status Init(const TAggregator& taggregator, RuntimeState* state,
-      const std::vector<TExpr>& conjuncts) override;
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -176,7 +211,7 @@ class GroupingAggregator : public Aggregator {
   /// nor this node's output row desc may contain the intermediate tuple, e.g.,
   /// in a single-node plan with an intermediate tuple different from the output tuple.
   /// Lives in the query state's obj_pool.
-  RowDescriptor intermediate_row_desc_;
+  const RowDescriptor& intermediate_row_desc_;
 
   /// True if this is first phase of a two-phase distributed aggregation for which we
   /// are doing a streaming preaggregation.
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 986c5ac..9ec5170 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -39,21 +39,21 @@ using namespace impala;
 PROFILE_DEFINE_TIMER(TotalRawHBaseReadTime, STABLE_HIGH,
     "Aggregate wall clock time spent reading from HBase.");
 
-HBaseScanNode::HBaseScanNode(ObjectPool* pool, const TPlanNode& tnode,
+HBaseScanNode::HBaseScanNode(ObjectPool* pool, const ScanPlanNode& pnode,
                              const DescriptorTbl& descs)
-    : ScanNode(pool, tnode, descs),
-      table_name_(tnode.hbase_scan_node.table_name),
-      tuple_id_(tnode.hbase_scan_node.tuple_id),
+    : ScanNode(pool, pnode, descs),
+      table_name_(pnode.tnode_->hbase_scan_node.table_name),
+      tuple_id_(pnode.tnode_->hbase_scan_node.tuple_id),
       tuple_desc_(NULL),
       tuple_idx_(0),
-      filters_(tnode.hbase_scan_node.filters),
+      filters_(pnode.tnode_->hbase_scan_node.filters),
       hbase_scanner_(NULL),
       row_key_slot_(NULL),
       row_key_binary_encoded_(false),
       text_converter_(new TextConverter('\\', "", false)),
       suggested_max_caching_(0) {
-  if (tnode.hbase_scan_node.__isset.suggested_max_caching) {
-    suggested_max_caching_ = tnode.hbase_scan_node.suggested_max_caching;
+  if (pnode.tnode_->hbase_scan_node.__isset.suggested_max_caching) {
+    suggested_max_caching_ = pnode.tnode_->hbase_scan_node.suggested_max_caching;
   }
 }
 
diff --git a/be/src/exec/hbase-scan-node.h b/be/src/exec/hbase-scan-node.h
index 4c60220..42cb958 100644
--- a/be/src/exec/hbase-scan-node.h
+++ b/be/src/exec/hbase-scan-node.h
@@ -35,7 +35,7 @@ class Tuple;
 ///
 class HBaseScanNode : public ScanNode {
  public:
-  HBaseScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  HBaseScanNode(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
 
   ~HBaseScanNode();
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 0471ae3..67fd2fd 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -22,6 +22,8 @@
 #include "exec/hdfs-orc-scanner.h"
 #include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/hdfs-rcfile-scanner.h"
+#include "exec/hdfs-scan-node-mt.h"
+#include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-sequence-scanner.h"
 #include "exec/hdfs-text-scanner.h"
 #include "exec/parquet/hdfs-parquet-scanner.h"
@@ -146,30 +148,8 @@ const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
 // Determines how many unexpected remote bytes trigger an error in the runtime state
 const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 
-HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-    : ScanNode(pool, tnode, descs),
-      min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
-          tnode.hdfs_scan_node.min_max_tuple_id : -1),
-      skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
-          tnode.hdfs_scan_node.skip_header_line_count : 0),
-      tuple_id_(tnode.hdfs_scan_node.tuple_id),
-      parquet_count_star_slot_offset_(
-          tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
-          tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1),
-      tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
-      thrift_dict_filter_conjuncts_map_(
-          tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
-          &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
-      disks_accessed_bitmap_(TUnit::UNIT, 0),
-      active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
-}
-
-HdfsScanNodeBase::~HdfsScanNodeBase() {
-}
-
-Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ScanNode::Init(tnode, state));
+Status HdfsScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(ScanPlanNode::Init(tnode, state));
 
   // Add collection item conjuncts
   for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
@@ -180,21 +160,61 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state,
         &conjuncts_map_[entry.first]));
   }
-  DCHECK(conjuncts_map_[tuple_id_].empty());
-  conjuncts_map_[tuple_id_] = conjuncts_;
+  const TTupleId& tuple_id = tnode.hdfs_scan_node.tuple_id;
+  DCHECK(conjuncts_map_[tuple_id].empty());
+  conjuncts_map_[tuple_id] = conjuncts_;
 
   // Add min max conjuncts
-  if (min_max_tuple_id_ != -1) {
-    min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_);
-    DCHECK(min_max_tuple_desc_ != nullptr);
+  if (tnode.hdfs_scan_node.__isset.min_max_tuple_id) {
+    TupleDescriptor* min_max_tuple_desc =
+        state->desc_tbl().GetTupleDescriptor(tnode.hdfs_scan_node.min_max_tuple_id);
+    DCHECK(min_max_tuple_desc != nullptr);
     RowDescriptor* min_max_row_desc = state->obj_pool()->Add(
-        new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ false));
+        new RowDescriptor(min_max_tuple_desc, /* is_nullable */ false));
     RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
   return Status::OK();
 }
 
+Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(tnode_->hdfs_scan_node.use_mt_scan_node ?
+          static_cast<HdfsScanNodeBase*>(
+              new HdfsScanNodeMt(pool, *this, state->desc_tbl())) :
+          static_cast<HdfsScanNodeBase*>(
+              new HdfsScanNode(pool, *this, state->desc_tbl())));
+  return Status::OK();
+}
+
+HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pnode,
+    const THdfsScanNode& hdfs_scan_node, const DescriptorTbl& descs)
+  : ScanNode(pool, pnode, descs),
+    min_max_tuple_id_(
+        hdfs_scan_node.__isset.min_max_tuple_id ? hdfs_scan_node.min_max_tuple_id : -1),
+    min_max_tuple_desc_(
+        min_max_tuple_id_ == -1 ? nullptr : descs.GetTupleDescriptor(min_max_tuple_id_)),
+    skip_header_line_count_(hdfs_scan_node.__isset.skip_header_line_count ?
+            hdfs_scan_node.skip_header_line_count :
+            0),
+    tuple_id_(hdfs_scan_node.tuple_id),
+    parquet_count_star_slot_offset_(
+        hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
+            hdfs_scan_node.parquet_count_star_slot_offset :
+            -1),
+    tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
+    thrift_dict_filter_conjuncts_map_(hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
+            &hdfs_scan_node.dictionary_filter_conjuncts :
+            nullptr),
+    disks_accessed_bitmap_(TUnit::UNIT, 0),
+    active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
+  conjuncts_map_ = pnode.conjuncts_map_;
+  min_max_conjuncts_ = pnode.min_max_conjuncts_;
+}
+
+HdfsScanNodeBase::~HdfsScanNodeBase() {
+}
+
 /// TODO: Break up this very long function.
 Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index b05006d..8e118bb 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -98,6 +98,19 @@ struct ScanRangeMetadata {
       : partition_id(partition_id), original_split(original_split) { }
 };
 
+class HdfsScanPlanNode : public ScanPlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
+  /// item tuples). Includes a copy of PlanNode.conjuncts_.
+  typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
+  ConjunctsMap conjuncts_map_;
+
+  /// Conjuncts to evaluate on parquet::Statistics.
+  std::vector<ScalarExpr*> min_max_conjuncts_;
+};
 
 /// Base class for all Hdfs scan nodes. Contains common members and functions
 /// that are independent of whether batches are materialized by the main thread
@@ -155,13 +168,13 @@ struct ScanRangeMetadata {
 ///
 /// TODO: Once the legacy scan node has been removed, several functions can be made
 /// non-virtual. Also merge this class with HdfsScanNodeMt.
+
 class HdfsScanNodeBase : public ScanNode {
  public:
-  HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pnode,
+      const THdfsScanNode& hdfs_scan_node, const DescriptorTbl& descs);
   ~HdfsScanNodeBase();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state)
-      override WARN_UNUSED_RESULT;
   virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 63f0b80..d8a2b02 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -33,9 +33,9 @@ using namespace impala::io;
 
 namespace impala {
 
-HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode,
+HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const HdfsScanPlanNode& pnode,
                            const DescriptorTbl& descs)
-    : HdfsScanNodeBase(pool, tnode, descs),
+    : HdfsScanNodeBase(pool, pnode, pnode.tnode_->hdfs_scan_node, descs),
       scan_range_(NULL),
       scanner_(NULL) {
 }
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 2dc0ac3..98ff6c2 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -37,7 +37,8 @@ class TPlanNode;
 /// in the thread calling GetNext(). Uses the HdfsScanner::GetNext() interface.
 class HdfsScanNodeMt : public HdfsScanNodeBase {
  public:
-  HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  HdfsScanNodeMt(
+      ObjectPool* pool, const HdfsScanPlanNode& pnode, const DescriptorTbl& descs);
   ~HdfsScanNodeMt();
 
   virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index f839bc4..63b57ad 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -68,9 +68,9 @@ const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
 // checking if a scanner thread should yield itself back to the global thread pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 
-HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
+HdfsScanNode::HdfsScanNode(ObjectPool* pool, const HdfsScanPlanNode& pnode,
                            const DescriptorTbl& descs)
-    : HdfsScanNodeBase(pool, tnode, descs) {
+    : HdfsScanNodeBase(pool, pnode, pnode.tnode_->hdfs_scan_node, descs) {
 }
 
 HdfsScanNode::~HdfsScanNode() {
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index eec58b2..d073262 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -70,7 +70,8 @@ class TPlanNode;
 /// fully functional.
 class HdfsScanNode : public HdfsScanNodeBase {
  public:
-  HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  HdfsScanNode(
+      ObjectPool* pool, const HdfsScanPlanNode& pnode, const DescriptorTbl& descs);
   ~HdfsScanNode();
 
   virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index 875675a..e86c14c 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -53,12 +53,12 @@ const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens";
 const string KuduScanNodeBase::KUDU_CLIENT_TIME = "KuduClientTime";
 
 KuduScanNodeBase::KuduScanNodeBase(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ScanNode(pool, tnode, descs),
-    tuple_id_(tnode.kudu_scan_node.tuple_id),
+    ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs)
+  : ScanNode(pool, pnode, descs),
+    tuple_id_(pnode.tnode_->kudu_scan_node.tuple_id),
     count_star_slot_offset_(
-            tnode.kudu_scan_node.__isset.count_star_slot_offset ?
-            tnode.kudu_scan_node.count_star_slot_offset : -1) {
+            pnode.tnode_->kudu_scan_node.__isset.count_star_slot_offset ?
+            pnode.tnode_->kudu_scan_node.count_star_slot_offset : -1) {
   DCHECK(KuduIsAvailable());
 }
 
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 6aa5e28..768e904 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -35,7 +35,8 @@ class KuduScanner;
 /// removed.
 class KuduScanNodeBase : public ScanNode {
  public:
-  KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  KuduScanNodeBase(
+      ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
   ~KuduScanNodeBase();
 
   virtual Status Prepare(RuntimeState* state) override;
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 5d444d8..412e66a 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -32,9 +32,9 @@
 
 namespace impala {
 
-KuduScanNodeMt::KuduScanNodeMt(ObjectPool* pool, const TPlanNode& tnode,
+KuduScanNodeMt::KuduScanNodeMt(ObjectPool* pool, const ScanPlanNode& pnode,
     const DescriptorTbl& descs)
-    : KuduScanNodeBase(pool, tnode, descs),
+    : KuduScanNodeBase(pool, pnode, descs),
       scan_token_(nullptr) {
   DCHECK(KuduIsAvailable());
 }
diff --git a/be/src/exec/kudu-scan-node-mt.h b/be/src/exec/kudu-scan-node-mt.h
index 94a238e..dff99ca 100644
--- a/be/src/exec/kudu-scan-node-mt.h
+++ b/be/src/exec/kudu-scan-node-mt.h
@@ -31,7 +31,7 @@ class KuduScanner;
 /// in the thread calling GetNext().
 class KuduScanNodeMt : public KuduScanNodeBase {
  public:
-  KuduScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  KuduScanNodeMt(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
 
   ~KuduScanNodeMt();
 
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index fe08731..be71022 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -54,9 +54,9 @@ DEFINE_int64_hidden(kudu_scanner_thread_max_estimated_bytes, 32L * 1024L * 1024L
 
 namespace impala {
 
-KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
+KuduScanNode::KuduScanNode(ObjectPool* pool, const ScanPlanNode& pnode,
     const DescriptorTbl& descs)
-    : KuduScanNodeBase(pool, tnode, descs),
+    : KuduScanNodeBase(pool, pnode, descs),
       done_(false),
       thread_avail_cb_id_(-1) {
   DCHECK(KuduIsAvailable());
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 59977a1..6d41f33 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -36,7 +36,7 @@ class ThreadResourcePool;
 /// are used to retrieve the rows for this scan.
 class KuduScanNode : public KuduScanNodeBase {
  public:
-  KuduScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  KuduScanNode(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
 
   ~KuduScanNode();
 
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 2278258..6be017e 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -36,32 +36,42 @@
 using namespace impala;
 using namespace strings;
 
-NestedLoopJoinNode::NestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-  : BlockingJoinNode("NestedLoopJoinNode", tnode.nested_loop_join_node.join_op, pool,
-        tnode, descs),
+Status NestedLoopJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
+  DCHECK(tnode.__isset.nested_loop_join_node);
+  // join_conjunct_evals_ are evaluated in the context of rows assembled from
+  // all inner and outer tuples.
+  RowDescriptor full_row_desc(
+      *children_[0]->row_descriptor_, *children_[1]->row_descriptor_);
+  RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts,
+      full_row_desc, state, &join_conjuncts_));
+  DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN
+      || join_conjuncts_.size() == 0)
+      << "Join conjuncts in a cross join";
+  return Status::OK();
+}
+
+Status NestedLoopJoinPlanNode::CreateExecNode(
+    RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new NestedLoopJoinNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+NestedLoopJoinNode::NestedLoopJoinNode(
+    ObjectPool* pool, const NestedLoopJoinPlanNode& pnode, const DescriptorTbl& descs)
+  : BlockingJoinNode("NestedLoopJoinNode", pnode.tnode_->nested_loop_join_node.join_op,
+        pool, pnode, descs),
     build_batches_(NULL),
     current_build_row_idx_(0),
     process_unmatched_build_rows_(false) {
+  join_conjuncts_ = pnode.join_conjuncts_;
 }
 
 NestedLoopJoinNode::~NestedLoopJoinNode() {
   DCHECK(is_closed());
 }
 
-Status NestedLoopJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
-  DCHECK(tnode.__isset.nested_loop_join_node);
-  // join_conjunct_evals_ are evaluated in the context of rows assembled from
-  // all inner and outer tuples.
-  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts,
-      full_row_desc, state, &join_conjuncts_));
-  DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN ||
-      join_conjuncts_.size() == 0) << "Join conjuncts in a cross join";
-  return Status::OK();
-}
-
 Status NestedLoopJoinNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   ScopedOpenEventAdder ea(this);
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 6fc09dc..e3cf2b3 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -33,18 +33,29 @@ class Bitmap;
 class RowBatch;
 class TupleRow;
 
+class NestedLoopJoinPlanNode : public BlockingJoinPlanNode {
+ public:
+  /// Join conjuncts.
+  std::vector<ScalarExpr*> join_conjuncts_;
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~NestedLoopJoinPlanNode(){}
+};
+
 /// Operator to perform nested-loop join. The build side is implemented by NljBuilder.
 /// This operator does not support spill to disk. Supports all join modes except
 /// null-aware left anti-join.
 ///
 /// TODO: Add support for null-aware left-anti join.
+
 class NestedLoopJoinNode : public BlockingJoinNode {
  public:
-  NestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode,
-      const DescriptorTbl& descs);
+  NestedLoopJoinNode(
+      ObjectPool* pool, const NestedLoopJoinPlanNode& pnode, const DescriptorTbl& descs);
   virtual ~NestedLoopJoinNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index 2bcaa98..e2e8c05 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -35,10 +35,10 @@
 
 namespace impala {
 
-NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx)
-  : Aggregator(exec_node, pool, taggregator, descs,
-        Substitute("NonGroupingAggregator $0", agg_idx), agg_idx) {}
+NonGroupingAggregator::NonGroupingAggregator(
+    ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config, int agg_idx)
+  : Aggregator(exec_node, pool, config, Substitute("NonGroupingAggregator $0", agg_idx),
+        agg_idx) {}
 
 Status NonGroupingAggregator::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(Aggregator::Prepare(state));
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index 3f18a71..bad0263 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -27,6 +27,7 @@
 namespace impala {
 
 class AggFnEvaluator;
+class AggregationPlanNode;
 class DescriptorTbl;
 class ExecNode;
 class LlvmCodeGen;
@@ -41,8 +42,8 @@ class Tuple;
 /// not support streaming preaggregation.
 class NonGroupingAggregator : public Aggregator {
  public:
-  NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx);
+  NonGroupingAggregator(
+      ExecNode* exec_node, ObjectPool* pool, const AggregatorConfig& config, int agg_idx);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index bf049b7..f8cdf8b 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -27,34 +27,44 @@
 
 namespace impala {
 
-PartialSortNode::PartialSortNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    sorter_(nullptr),
-    input_batch_index_(0),
-    input_eos_(false),
-    sorter_eos_(true) {}
-
-PartialSortNode::~PartialSortNode() {
-  DCHECK(input_batch_.get() == nullptr);
-}
-
-Status PartialSortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
-  DCHECK(limit_ == -1);
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
   RETURN_IF_ERROR(ScalarExpr::Create(
-      tsort_info.ordering_exprs, row_descriptor_, state, &ordering_exprs_));
+      tsort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
   DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
   RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
-      *child(0)->row_desc(), state, &sort_tuple_exprs_));
+      *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
-  runtime_profile()->AddInfoString("SortType", "Partial");
   return Status::OK();
 }
 
+Status PartialSortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new PartialSortNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+PartialSortNode::PartialSortNode(
+    ObjectPool* pool, const PartialSortPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    sorter_(nullptr),
+    input_batch_index_(0),
+    input_eos_(false),
+    sorter_eos_(true) {
+  ordering_exprs_ = pnode.ordering_exprs_;
+  sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
+  is_asc_order_ = pnode.is_asc_order_;
+  nulls_first_ = pnode.nulls_first_;
+  runtime_profile()->AddInfoString("SortType", "Partial");
+}
+
+PartialSortNode::~PartialSortNode() {
+  DCHECK(input_batch_.get() == nullptr);
+}
+
 Status PartialSortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index 156d574..12d460a 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -23,6 +23,24 @@
 
 namespace impala {
 
+class PartialSortPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~PartialSortPlanNode(){}
+
+    /// Expressions and parameters used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Expressions used to materialize slots in the tuples to be sorted.
+  /// One expr per slot in the materialized tuple.
+  std::vector<ScalarExpr*> sort_tuple_slot_exprs_;
+
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+};
+
 /// Node that implements a partial sort, where its input is divided up into runs, each
 /// of which is sorted individually.
 ///
@@ -40,12 +58,13 @@ namespace impala {
 /// single tuple is then what the sort operates on.
 ///
 /// PartialSortNode does not support limits or offsets.
+
 class PartialSortNode : public ExecNode {
  public:
-  PartialSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  PartialSortNode(
+      ObjectPool* pool, const PartialSortPlanNode& pnode, const DescriptorTbl& descs);
   ~PartialSortNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 763111d..737a963 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -48,57 +48,78 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
 using namespace impala;
 using strings::Substitute;
 
-PartitionedHashJoinNode::PartitionedHashJoinNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : BlockingJoinNode(
-        "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs) {
-  memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
-}
-
-PartitionedHashJoinNode::~PartitionedHashJoinNode() {
-  // Check that we didn't leak any memory.
-  DCHECK(null_probe_rows_ == NULL);
-}
-
-Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
+Status PartitionedHashJoinPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
   DCHECK(tnode.__isset.hash_join_node);
   const vector<TEqJoinCondition>& eq_join_conjuncts =
       tnode.hash_join_node.eq_join_conjuncts;
-  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
-  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
-  // being separated out further.
-  builder_.reset(new PhjBuilder(id(), label(), join_op_,
-      child(1)->row_desc(), state, buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
-  RETURN_IF_ERROR(
-      builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
+  // TODO: change PhjBuilder::InitExprsAndFilters to accept the runtime filter contexts
+  // and build_exprs_ generated here in init and not create its own. Then pass those in
+  // during Prepare phase.
 
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ScalarExpr* probe_expr;
     RETURN_IF_ERROR(ScalarExpr::Create(
-        eq_join_conjunct.left, *child(0)->row_desc(), state, &probe_expr));
+        eq_join_conjunct.left,*children_[0]->row_descriptor_, state, &probe_expr));
     probe_exprs_.push_back(probe_expr);
     ScalarExpr* build_expr;
     RETURN_IF_ERROR(ScalarExpr::Create(
-        eq_join_conjunct.right, *child(1)->row_desc(), state, &build_expr));
+        eq_join_conjunct.right, *children_[1]->row_descriptor_, state, &build_expr));
     build_exprs_.push_back(build_expr);
   }
   // other_join_conjuncts_ are evaluated in the context of rows assembled from all build
   // and probe tuples; full_row_desc is not necessarily the same as the output row desc,
   // e.g., because semi joins only return the build xor probe tuples
-  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
+  RowDescriptor full_row_desc(
+      *children_[0]->row_descriptor_, *children_[1]->row_descriptor_);
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
       full_row_desc, state, &other_join_conjuncts_));
-  DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || eq_join_conjuncts.size() == 1);
+  DCHECK(tnode.hash_join_node.join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
+      || eq_join_conjuncts.size() == 1);
+  return Status::OK();
+}
+
+Status PartitionedHashJoinPlanNode::CreateExecNode(
+    RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new PartitionedHashJoinNode(pool, *this, state->desc_tbl()));
   return Status::OK();
 }
 
+PartitionedHashJoinNode::PartitionedHashJoinNode(ObjectPool* pool,
+    const PartitionedHashJoinPlanNode& pnode, const DescriptorTbl& descs)
+  : BlockingJoinNode("PartitionedHashJoinNode", pnode.tnode_->hash_join_node.join_op,
+        pool, pnode, descs) {
+  memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
+  build_exprs_ = pnode.build_exprs_;
+  probe_exprs_ = pnode.probe_exprs_;
+  other_join_conjuncts_ = pnode.other_join_conjuncts_;
+}
+
+PartitionedHashJoinNode::~PartitionedHashJoinNode() {
+  // Check that we didn't leak any memory.
+  DCHECK(null_probe_rows_ == NULL);
+}
+
 Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
 
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
   runtime_state_ = state;
+  const vector<TEqJoinCondition>& eq_join_conjuncts =
+      plan_node_.tnode_->hash_join_node.eq_join_conjuncts;
+  // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+  // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+  // being separated out further.
+  // TODO: Move the builder creation on Expr Init into the constructor once the PlanRoot
+  // Equivalent of a sink is implemented. build_exprs_ and filter_exprs can be passed
+  // directly from those generated in Phj node, only thing left to do is register the
+  // filters.
+  builder_.reset(new PhjBuilder(id(), label(), join_op_, child(1)->row_desc(), state,
+      buffer_pool_client(), resource_profile_.spillable_buffer_size,
+      resource_profile_.max_row_buffer_size));
+  RETURN_IF_ERROR(builder_->InitExprsAndFilters(
+      state, eq_join_conjuncts, plan_node_.tnode_->runtime_filters));
 
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 5ff8ab2..38c274f 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -38,6 +38,22 @@ class RowBatch;
 class RuntimeFilter;
 class TupleRow;
 
+class PartitionedHashJoinPlanNode : public BlockingJoinPlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~PartitionedHashJoinPlanNode(){}
+
+  /// Our equi-join predicates "<lhs> = <rhs>" are separated into
+  /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0))
+  std::vector<ScalarExpr*> build_exprs_;
+  std::vector<ScalarExpr*> probe_exprs_;
+
+  /// Non-equi-join conjuncts from the ON clause.
+  std::vector<ScalarExpr*> other_join_conjuncts_;
+};
+
 /// Operator to perform partitioned hash join, spilling to disk as necessary. This
 /// operator implements multiple join modes with the same code algorithm.
 ///
@@ -109,13 +125,13 @@ class TupleRow;
 /// NULLs into several different streams, which are processed in a separate step to
 /// produce additional output rows. The NAAJ algorithm is documented in more detail in
 /// header comments for the null aware functions and data structures.
+
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
-  PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
+  PartitionedHashJoinNode(ObjectPool* pool, const PartitionedHashJoinPlanNode& pnode,
       const DescriptorTbl& descs);
   virtual ~PartitionedHashJoinNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -536,7 +552,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// State of the probing algorithm. Used to drive the state machine in GetNext().
   ProbeState probe_state_ = ProbeState::PROBE_COMPLETE;
 
-  /// The build-side of the join. Initialized in Init().
+  /// The build-side of the join. Initialized in Prepare().
   boost::scoped_ptr<PhjBuilder> builder_;
 
   /// Last set of hash partitions obtained from builder_. Only valid when the
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 2334e0b..0f135c4 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -20,6 +20,10 @@
 #include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
 
+#include "exec/data-source-scan-node.h"
+#include "exec/hbase-scan-node.h"
+#include "exec/kudu-scan-node-mt.h"
+#include "exec/kudu-scan-node.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/blocking-row-batch-queue.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -92,27 +96,60 @@ PROFILE_DEFINE_HIGH_WATER_MARK_COUNTER(PeakScannerThreadConcurrency, STABLE_LOW,
 
 const string ScanNode::SCANNER_THREAD_COUNTERS_PREFIX = "ScannerThreads";
 
-bool ScanNode::IsDataCacheDisabled() const {
-  return runtime_state()->query_options().disable_data_cache;
-}
-
-Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+Status ScanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TQueryOptions& query_options = state->query_options();
   for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) {
     auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id);
     DCHECK(it != filter_desc.planid_to_target_ndx.end());
     const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second];
-    DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL ||
-        target.is_local_target);
-    DCHECK(!query_options.disable_row_runtime_filtering ||
-        target.is_bound_by_partition_columns);
+    DCHECK(state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL
+        || target.is_local_target);
+    DCHECK(!query_options.disable_row_runtime_filtering
+        || target.is_bound_by_partition_columns);
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(target.target_expr, *row_desc(), state, &filter_expr));
-    filter_exprs_.push_back(filter_expr);
+        ScalarExpr::Create(target.target_expr, *row_descriptor_, state, &filter_expr));
+    runtime_filter_exprs_.push_back(filter_expr);
+  }
+  return Status::OK();
+}
+
+Status ScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  switch (tnode_->node_type) {
+    case TPlanNodeType::HBASE_SCAN_NODE:
+      *node = pool->Add(new HBaseScanNode(pool, *this, state->desc_tbl()));
+      break;
+    case TPlanNodeType::DATA_SOURCE_NODE:
+      *node = pool->Add(new DataSourceScanNode(pool, *this, state->desc_tbl()));
+      break;
+    case TPlanNodeType::KUDU_SCAN_NODE:
+      if (tnode_->kudu_scan_node.use_mt_scan_node) {
+        DCHECK_GT(state->query_options().mt_dop, 0);
+        *node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
+      } else {
+        DCHECK(state->query_options().mt_dop == 0
+            || state->query_options().num_scanner_threads == 1);
+        *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
+      }
+      break;
+    default:
+      DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type;
+  }
+  return Status::OK();
+}
+
+bool ScanNode::IsDataCacheDisabled() const {
+  return runtime_state()->query_options().disable_data_cache;
+}
 
-    // TODO: Move this to Prepare()
+Status ScanNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  runtime_state_ = state;
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+
+  for (const TRuntimeFilterDesc& filter_desc : plan_node_.tnode_->runtime_filters) {
     filter_ctxs_.emplace_back();
     FilterContext& filter_ctx = filter_ctxs_.back();
     filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false);
@@ -128,14 +165,6 @@ Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     }
   }
 
-  return Status::OK();
-}
-
-Status ScanNode::Prepare(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  runtime_state_ = state;
-  RETURN_IF_ERROR(ExecNode::Prepare(state));
-
   rows_read_counter_ = PROFILE_RowsRead.Instantiate(runtime_profile());
   materialize_tuple_timer_ = PROFILE_MaterializeTupleTime.Instantiate(runtime_profile());
 
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 2154c65..23aaa88 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -29,6 +29,12 @@ namespace impala {
 class BlockingRowBatchQueue;
 class TScanRange;
 
+class ScanPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+};
+
 /// Abstract base class of all scan nodes. Subclasses support different storage layers
 /// and different threading models.
 ///
@@ -91,13 +97,15 @@ class TScanRange;
 ///   RowBatchQueuePeakMemoryUsage - peak memory consumption of row batches enqueued in
 ///     the scan node's output queue.
 ///
+
 class ScanNode : public ExecNode {
  public:
-  ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
-      scan_range_params_(NULL) {}
+  ScanNode(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs)
+    : ExecNode(pool, pnode, descs),
+      scan_range_params_(NULL) {
+    filter_exprs_ = pnode.runtime_filter_exprs_;
+  }
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
 
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index a8708f3..dc689ff 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -31,15 +31,20 @@
 
 namespace impala {
 
-SelectNode::SelectNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
-      child_row_batch_(NULL),
-      child_row_idx_(0),
-      child_eos_(false),
-      codegend_copy_rows_fn_(nullptr) {
+Status SelectPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new SelectNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
 }
 
+SelectNode::SelectNode(
+    ObjectPool* pool, const SelectPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    child_row_batch_(NULL),
+    child_row_idx_(0),
+    child_eos_(false),
+    codegend_copy_rows_fn_(nullptr) {}
+
 Status SelectNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
diff --git a/be/src/exec/select-node.h b/be/src/exec/select-node.h
index c4d8098..6d8e5cd 100644
--- a/be/src/exec/select-node.h
+++ b/be/src/exec/select-node.h
@@ -30,11 +30,18 @@ namespace impala {
 class Tuple;
 class TupleRow;
 
+class SelectPlanNode : public PlanNode {
+ public:
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~SelectPlanNode(){}
+};
+
 /// Node that evaluates conjuncts and enforces a limit but otherwise passes along
 /// the rows pulled from its child unchanged.
+
 class SelectNode : public ExecNode {
  public:
-  SelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  SelectNode(ObjectPool* pool, const SelectPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
diff --git a/be/src/exec/singular-row-src-node.cc b/be/src/exec/singular-row-src-node.cc
index 02a3fb6..24ab531 100644
--- a/be/src/exec/singular-row-src-node.cc
+++ b/be/src/exec/singular-row-src-node.cc
@@ -22,10 +22,17 @@
 
 namespace impala {
 
-SingularRowSrcNode::SingularRowSrcNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs) : ExecNode(pool, tnode, descs) {
+Status SingularRowSrcPlanNode::CreateExecNode(
+    RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new SingularRowSrcNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
 }
 
+SingularRowSrcNode::SingularRowSrcNode(
+    ObjectPool* pool, const SingularRowSrcPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs) {}
+
 Status SingularRowSrcNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   // We do not time this function, check for cancellation, or perform the usual per-batch
   // query maintenance because those would dominate the execution cost of this node.
diff --git a/be/src/exec/singular-row-src-node.h b/be/src/exec/singular-row-src-node.h
index ae51887..c6843ba 100644
--- a/be/src/exec/singular-row-src-node.h
+++ b/be/src/exec/singular-row-src-node.h
@@ -22,12 +22,19 @@
 
 namespace impala {
 
+class SingularRowSrcPlanNode : public PlanNode {
+ public:
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~SingularRowSrcPlanNode(){}
+};
+
 /// Exec node that returns a single row from its containing Subplan node.
 /// Does not have any children.
+
 class SingularRowSrcNode : public ExecNode {
  public:
-  SingularRowSrcNode(ObjectPool* pool, const TPlanNode& tnode,
-      const DescriptorTbl& descs);
+  SingularRowSrcNode(
+      ObjectPool* pool, const SingularRowSrcPlanNode& pnode, const DescriptorTbl& descs);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
 };
 
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 659e71c..2efe40e 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -27,30 +27,41 @@
 
 namespace impala {
 
-SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
-    sorter_(NULL),
-    num_rows_skipped_(0) {
-}
-
-SortNode::~SortNode() {
-}
-
-Status SortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_,
-      state, &ordering_exprs_));
+  RETURN_IF_ERROR(ScalarExpr::Create(
+      tsort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
   DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
   RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
-      *child(0)->row_desc(), state, &sort_tuple_exprs_));
+      *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
-  runtime_profile()->AddInfoString("SortType", "Total");
   return Status::OK();
 }
 
+Status SortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new SortNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+SortNode::SortNode(
+    ObjectPool* pool, const SortPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
+    sorter_(NULL),
+    num_rows_skipped_(0) {
+  ordering_exprs_ = pnode.ordering_exprs_;
+  sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
+  is_asc_order_ = pnode.is_asc_order_;
+  nulls_first_ = pnode.nulls_first_;
+  runtime_profile()->AddInfoString("SortType", "Total");
+}
+
+SortNode::~SortNode() {
+}
+
 Status SortNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index ce7b188..f6fdd52 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -23,6 +23,24 @@
 
 namespace impala {
 
+class SortPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~SortPlanNode(){}
+
+  /// Expressions and parameters used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Expressions used to materialize slots in the tuples to be sorted.
+  /// One expr per slot in the materialized tuple.
+  std::vector<ScalarExpr*> sort_tuple_slot_exprs_;
+
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+};
+
 /// Node that implements a full sort of its input with a fixed memory budget, spilling
 /// to disk if the input is larger than available memory.
 /// Uses Sorter for the external sort implementation.
@@ -32,12 +50,12 @@ namespace impala {
 /// in Open() to fill it with sorted rows.
 /// If a merge phase was performed in the sort, sorted rows are deep copied into
 /// the output batch. Otherwise, the sorter instance owns the sorted data.
+
 class SortNode : public ExecNode {
  public:
-  SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  SortNode(ObjectPool* pool, const SortPlanNode& pnode, const DescriptorTbl& descs);
   ~SortNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index 1a00070..ef34a0b 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -33,12 +33,12 @@
 namespace impala {
 
 StreamingAggregationNode::StreamingAggregationNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : AggregationNodeBase(pool, tnode, descs) {
-  DCHECK(tnode.conjuncts.empty()) << "Preaggs have no conjuncts";
+    ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs)
+  : AggregationNodeBase(pool, pnode, descs) {
+  DCHECK(pnode.tnode_->conjuncts.empty()) << "Preaggs have no conjuncts";
   DCHECK(limit_ == -1) << "Preaggs have no limits";
-  for (int i = 0; i < tnode.agg_node.aggregators.size(); ++i) {
-    DCHECK(tnode.agg_node.aggregators[i].use_streaming_preaggregation);
+  for (auto& t_agg : pnode.tnode_->agg_node.aggregators) {
+    DCHECK(t_agg.use_streaming_preaggregation);
   }
 }
 
diff --git a/be/src/exec/streaming-aggregation-node.h b/be/src/exec/streaming-aggregation-node.h
index 4b1ca74..6af70b0 100644
--- a/be/src/exec/streaming-aggregation-node.h
+++ b/be/src/exec/streaming-aggregation-node.h
@@ -46,7 +46,7 @@ class RuntimeState;
 class StreamingAggregationNode : public AggregationNodeBase {
  public:
   StreamingAggregationNode(
-      ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+      ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs);
 
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index d19d77c..85addfd 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -27,9 +27,42 @@
 
 namespace impala {
 
-SubplanNode::SubplanNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
+Status SubplanPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  DCHECK_EQ(children_.size(), 2);
+  RETURN_IF_ERROR(SetContainingSubplan(state, this, children_[1]));
+  return Status::OK();
+}
+
+Status SubplanPlanNode::SetContainingSubplan(
+    RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node) {
+  node->containing_subplan_ = ancestor;
+  if (node->tnode_->node_type == TPlanNodeType::SUBPLAN_NODE) {
+    // Only traverse the first child and not the second one, because the Subplan
+    // parent of nodes inside it should be 'node' and not 'ancestor'.
+    RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->children_[0]));
+  } else {
+    if (node->tnode_->node_type == TPlanNodeType::UNNEST_NODE) {
+      UnnestPlanNode* unnest_node = reinterpret_cast<UnnestPlanNode*>(node);
+      RETURN_IF_ERROR(unnest_node->InitCollExpr(state));
+    }
+    int num_children = node->children_.size();
+    for (int i = 0; i < num_children; ++i) {
+      RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->children_[i]));
+    }
+  }
+  return Status::OK();
+}
+
+Status SubplanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new SubplanNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+SubplanNode::SubplanNode(
+    ObjectPool* pool, const SubplanPlanNode& pnode, const DescriptorTbl& descs)
+    : ExecNode(pool, pnode, descs),
       input_batch_(NULL),
       input_eos_(false),
       input_row_idx_(0),
@@ -38,35 +71,23 @@ SubplanNode::SubplanNode(ObjectPool* pool, const TPlanNode& tnode,
       subplan_eos_(false) {
 }
 
-Status SubplanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  DCHECK_EQ(children_.size(), 2);
-  RETURN_IF_ERROR(SetContainingSubplan(state, this, child(1)));
-  return Status::OK();
-}
-
-Status SubplanNode::SetContainingSubplan(
-    RuntimeState* state, SubplanNode* ancestor, ExecNode* node) {
+void SubplanNode::SetContainingSubplan(SubplanNode* ancestor, ExecNode* node) {
   node->set_containing_subplan(ancestor);
   if (node->type() == TPlanNodeType::SUBPLAN_NODE) {
     // Only traverse the first child and not the second one, because the Subplan
     // parent of nodes inside it should be 'node' and not 'ancestor'.
-    RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(0)));
+    SetContainingSubplan(ancestor, node->child(0));
   } else {
-    if (node->type() == TPlanNodeType::UNNEST_NODE) {
-      UnnestNode* unnest_node = reinterpret_cast<UnnestNode*>(node);
-      RETURN_IF_ERROR(unnest_node->InitCollExpr(state));
-    }
     int num_children = node->num_children();
     for (int i = 0; i < num_children; ++i) {
-      RETURN_IF_ERROR(SetContainingSubplan(state, ancestor, node->child(i)));
+      SetContainingSubplan(ancestor, node->child(i));
     }
   }
-  return Status::OK();
 }
 
 Status SubplanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  SetContainingSubplan(this, child(1));
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   input_batch_.reset(
       new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h
index ecf7c71..6517df8 100644
--- a/be/src/exec/subplan-node.h
+++ b/be/src/exec/subplan-node.h
@@ -25,6 +25,21 @@ namespace impala {
 
 class TupleRow;
 
+class SubplanPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  /// Sets 'ancestor' as the containing Subplan in all plan nodes inside the plan-node
+  /// tree rooted at 'node' and does any initialization that is required as a result of
+  /// setting the subplan. Doesn't traverse the second child of SubplanPlanNodes within
+  /// 'node'.
+  Status SetContainingSubplan(
+    RuntimeState* state, SubplanPlanNode* ancestor, PlanNode* node);
+
+  ~SubplanPlanNode(){}
+};
+
 /// For every input row from its first child, a SubplanNode evaluates and pulls all
 /// results from its second child, resetting the second child after every input row.
 /// A SubplanNode does not create any output rows itself. It merely
@@ -46,11 +61,11 @@ class TupleRow;
 /// The resources owned by batches from the first child of this node are always
 /// transferred to the output batch right before fetching a new batch from the
 /// first child.
+
 class SubplanNode : public ExecNode {
  public:
-  SubplanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  SubplanNode(ObjectPool* pool, const SubplanPlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
@@ -62,10 +77,10 @@ class SubplanNode : public ExecNode {
   friend class UnnestNode;
 
   /// Sets 'ancestor' as the containing Subplan in all exec nodes inside the exec-node
-  /// tree rooted at 'node' and does any initialization that is required as a result of
-  /// setting the subplan. Doesn't traverse the second child of SubplanNodes within
-  /// 'node'.
-  Status SetContainingSubplan(RuntimeState* state, SubplanNode* ancestor, ExecNode* node);
+  /// tree rooted at 'node'. Doesn't traverse the second child of SubplanNodes within
+  /// 'node'. Should be called in Prepare before calling ExecNode's Prepare so that all
+  /// the children nodes have this set before prepare is called on them.
+  void SetContainingSubplan(SubplanNode* ancestor, ExecNode* node);
 
   /// Returns the current row from child(0) or NULL if no rows from child(0) have been
   /// retrieved yet (GetNext() has not yet been called). This function is called by
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 6b32b94..b67e03a 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -40,35 +40,46 @@
 using std::priority_queue;
 using namespace impala;
 
-TopNNode::TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    offset_(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
-    output_tuple_desc_(row_descriptor_.tuple_descriptors()[0]),
-    tuple_row_less_than_(NULL),
-    tmp_tuple_(NULL),
-    tuple_pool_(NULL),
-    codegend_insert_batch_fn_(NULL),
-    rows_to_reclaim_(0),
-    tuple_pool_reclaim_counter_(NULL),
-    num_rows_skipped_(0) {
-}
-
-Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+Status TopNPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   const TSortInfo& tsort_info = tnode.sort_node.sort_info;
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, row_descriptor_,
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.ordering_exprs, *row_descriptor_,
       state, &ordering_exprs_));
   DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
   RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
-      *child(0)->row_desc(), state, &output_tuple_exprs_));
+      *children_[0]->row_descriptor_, state, &output_tuple_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
   DCHECK_EQ(conjuncts_.size(), 0)
       << "TopNNode should never have predicates to evaluate.";
-  runtime_profile()->AddInfoString("SortType", "TopN");
   return Status::OK();
 }
 
+Status TopNPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new TopNNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+TopNNode::TopNNode(
+    ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
+    output_tuple_desc_(row_descriptor_.tuple_descriptors()[0]),
+    tuple_row_less_than_(NULL),
+    tmp_tuple_(NULL),
+    tuple_pool_(NULL),
+    codegend_insert_batch_fn_(NULL),
+    rows_to_reclaim_(0),
+    tuple_pool_reclaim_counter_(NULL),
+    num_rows_skipped_(0) {
+  ordering_exprs_ = pnode.ordering_exprs_;
+  output_tuple_exprs_ = pnode.output_tuple_exprs_;
+  is_asc_order_ = pnode.is_asc_order_;
+  nulls_first_ = pnode.nulls_first_;
+  runtime_profile()->AddInfoString("SortType", "TopN");
+}
+
 Status TopNNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   DCHECK(output_tuple_desc_ != nullptr);
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index a80d766..53abab9 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -33,16 +33,33 @@ class MemPool;
 class RuntimeState;
 class Tuple;
 
+class TopNPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~TopNPlanNode(){}
+
+  /// Ordering expressions used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Materialization exprs for the output tuple and their evaluators.
+  std::vector<ScalarExpr*> output_tuple_exprs_;
+
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+};
+
 /// Node for in-memory TopN (ORDER BY ... LIMIT)
 /// This handles the case where the result fits in memory.
 /// This node will materialize its input rows into a new tuple using the expressions
 /// in sort_tuple_slot_exprs_ in its sort_exec_exprs_ member.
 /// TopN is implemented by storing rows in a priority queue.
+
 class TopNNode : public ExecNode {
  public:
-  TopNNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  TopNNode(ObjectPool* pool, const TopNPlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 9d6c8a1..3f716c3 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -32,30 +32,19 @@
 
 using namespace impala;
 
-UnionNode::UnionNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-    : ExecNode(pool, tnode, descs),
-      tuple_id_(tnode.union_node.tuple_id),
-      tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
-      first_materialized_child_idx_(tnode.union_node.first_materialized_child_idx),
-      child_idx_(0),
-      child_batch_(nullptr),
-      child_row_idx_(0),
-      child_eos_(false),
-      const_exprs_lists_idx_(0),
-      to_close_child_idx_(-1) { }
-
-Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+Status UnionPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
   DCHECK(tnode.__isset.union_node);
   DCHECK_EQ(conjuncts_.size(), 0);
-  DCHECK(tuple_desc_ != nullptr);
+  const TupleDescriptor* tuple_desc =
+      state->desc_tbl().GetTupleDescriptor(tnode.union_node.tuple_id);
+  DCHECK(tuple_desc != nullptr);
   // Create const_exprs_lists_ from thrift exprs.
   const vector<vector<TExpr>>& const_texpr_lists = tnode.union_node.const_expr_lists;
   for (const vector<TExpr>& texprs : const_texpr_lists) {
     vector<ScalarExpr*> const_exprs;
-    RETURN_IF_ERROR(ScalarExpr::Create(texprs, *row_desc(), state, &const_exprs));
-    DCHECK_EQ(const_exprs.size(), tuple_desc_->slots().size());
+    RETURN_IF_ERROR(ScalarExpr::Create(texprs, *row_descriptor_, state, &const_exprs));
+    DCHECK_EQ(const_exprs.size(), tuple_desc->slots().size());
     const_exprs_lists_.push_back(const_exprs);
   }
   // Create child_exprs_lists_ from thrift exprs.
@@ -64,13 +53,35 @@ Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     const vector<TExpr>& texprs = thrift_result_exprs[i];
     vector<ScalarExpr*> child_exprs;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(texprs, *child(i)->row_desc(), state, &child_exprs));
+        ScalarExpr::Create(texprs, *children_[i]->row_descriptor_, state, &child_exprs));
     child_exprs_lists_.push_back(child_exprs);
-    DCHECK_EQ(child_exprs.size(), tuple_desc_->slots().size());
+    DCHECK_EQ(child_exprs.size(), tuple_desc->slots().size());
   }
   return Status::OK();
 }
 
+Status UnionPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new UnionNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+UnionNode::UnionNode(
+    ObjectPool* pool, const UnionPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
+    tuple_id_(pnode.tnode_->union_node.tuple_id),
+    tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
+    first_materialized_child_idx_(pnode.tnode_->union_node.first_materialized_child_idx),
+    child_idx_(0),
+    child_batch_(nullptr),
+    child_row_idx_(0),
+    child_eos_(false),
+    const_exprs_lists_idx_(0),
+    to_close_child_idx_(-1) {
+  const_exprs_lists_ = pnode.const_exprs_lists_;
+  child_exprs_lists_ = pnode.child_exprs_lists_;
+}
+
 Status UnionNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index b459392..9812121 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -36,6 +36,21 @@ class Tuple;
 class TupleRow;
 class TPlanNode;
 
+class UnionPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~UnionPlanNode(){}
+
+  /// Const exprs materialized by this node. These exprs don't refer to any children.
+  /// Only materialized by the first fragment instance to avoid duplication.
+  std::vector<std::vector<ScalarExpr*>> const_exprs_lists_;
+
+  /// Exprs materialized by this node. The i-th result expr list refers to the i-th child.
+  std::vector<std::vector<ScalarExpr*>> child_exprs_lists_;
+};
+
 /// Node that merges the results of its children by either materializing their
 /// evaluated expressions into row batches or passing through (forwarding) the
 /// batches if the input tuple layout is identical to the output tuple layout
@@ -43,11 +58,11 @@ class TPlanNode;
 /// such that all passthrough children come before the children that need
 /// materialization. The union node pulls from its children sequentially, i.e.
 /// it exhausts one child completely before moving on to the next one.
+
 class UnionNode : public ExecNode {
  public:
-  UnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  UnionNode(ObjectPool* pool, const UnionPlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 1dde338..1dc8522 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -31,11 +31,41 @@ namespace impala {
 
 const CollectionValue UnnestNode::EMPTY_COLLECTION_VALUE;
 
-UnnestNode::UnnestNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
+Status UnnestPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  DCHECK(tnode.__isset.unnest_node);
+  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
+  return Status::OK();
+}
+
+Status UnnestPlanNode::InitCollExpr(RuntimeState* state) {
+  DCHECK(containing_subplan_ != nullptr)
+      << "set_containing_subplan() must have been called";
+  const RowDescriptor& row_desc = *containing_subplan_->children_[0]->row_descriptor_;
+  RETURN_IF_ERROR(ScalarExpr::Create(
+      tnode_->unnest_node.collection_expr, row_desc, state, &collection_expr_));
+  DCHECK(collection_expr_->IsSlotRef());
+
+  // Set the coll_slot_desc_ and the corresponding tuple index used for manually
+  // evaluating the collection SlotRef and for projection.
+  DCHECK(collection_expr_->IsSlotRef());
+  const SlotRef* slot_ref = static_cast<SlotRef*>(collection_expr_);
+  coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
+  DCHECK(coll_slot_desc_ != nullptr);
+  coll_tuple_idx_ = row_desc.GetTupleIdx(coll_slot_desc_->parent()->id());
+  return Status::OK();
+}
+
+Status UnnestPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new UnnestNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+UnnestNode::UnnestNode(
+    ObjectPool* pool, const UnnestPlanNode& pnode, const DescriptorTbl& descs)
+  : ExecNode(pool, pnode, descs),
     item_byte_size_(0),
-    thrift_coll_expr_(tnode.unnest_node.collection_expr),
+    thrift_coll_expr_(pnode.tnode_->unnest_node.collection_expr),
     coll_expr_(nullptr),
     coll_expr_eval_(nullptr),
     coll_slot_desc_(nullptr),
@@ -50,21 +80,9 @@ UnnestNode::UnnestNode(ObjectPool* pool, const TPlanNode& tnode,
     max_collection_size_counter_(nullptr),
     min_collection_size_counter_(nullptr),
     num_collections_counter_(nullptr) {
-}
-
-Status UnnestNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  DCHECK(tnode.__isset.unnest_node);
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-  return Status::OK();
-}
-
-Status UnnestNode::InitCollExpr(RuntimeState* state) {
-  DCHECK(containing_subplan_ != nullptr)
-      << "set_containing_subplan() must have been called";
-  const RowDescriptor& row_desc = *containing_subplan_->child(0)->row_desc();
-  RETURN_IF_ERROR(ScalarExpr::Create(thrift_coll_expr_, row_desc, state, &coll_expr_));
-  DCHECK(coll_expr_->IsSlotRef());
-  return Status::OK();
+  coll_expr_ = pnode.collection_expr_;
+  coll_slot_desc_ = pnode.coll_slot_desc_;
+  coll_tuple_idx_ = pnode.coll_tuple_idx_;
 }
 
 Status UnnestNode::Prepare(RuntimeState* state) {
@@ -87,16 +105,6 @@ Status UnnestNode::Prepare(RuntimeState* state) {
 
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(*coll_expr_, state, pool_,
       expr_perm_pool(), expr_results_pool(), &coll_expr_eval_));
-
-  // Set the coll_slot_desc_ and the corresponding tuple index used for manually
-  // evaluating the collection SlotRef and for projection.
-  DCHECK(coll_expr_->IsSlotRef());
-  const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_);
-  coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
-  DCHECK(coll_slot_desc_ != nullptr);
-  const RowDescriptor* row_desc = containing_subplan_->child(0)->row_desc();
-  coll_tuple_idx_ = row_desc->GetTupleIdx(coll_slot_desc_->parent()->id());
-
   return Status::OK();
 }
 
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index ff4d661..86339bf 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -26,6 +26,29 @@ namespace impala {
 
 class TupleDescriptor;
 
+class UnnestPlanNode : public PlanNode {
+ public:
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  /// Initializes the expression which produces the collection to be unnested.
+  /// Called by the containing subplan plan-node.
+  Status InitCollExpr(RuntimeState* state);
+
+  ~UnnestPlanNode(){}
+
+  /// Expr that produces the collection to be unnested. Currently always a SlotRef into an
+  /// collection-typed slot. We do not evaluate this expr for setting coll_value_, but
+  /// instead manually retrieve the slot value to support projection (see class comment).
+  ScalarExpr* collection_expr_;
+
+  /// Descriptor of the collection-typed slot referenced by coll_expr_eval_. Set in
+  /// Prepare().  This slot is always set to NULL in Open() as a simple projection.
+  const SlotDescriptor* coll_slot_desc_;
+
+  /// Tuple index corresponding to coll_slot_desc_. Set in Prepare().
+  int coll_tuple_idx_;
+};
+
 /// Exec node that scans an in-memory collection of tuples (a CollectionValue) producing
 /// one output row per tuple in the collection. The output row is composed of a single
 /// tuple - the collection's item tuple.
@@ -51,21 +74,17 @@ class TupleDescriptor;
 /// TODO: Setting the collection-typed slots to NULL should be replaced by a proper
 /// projection at materialization points. The current solution purposely ignores the
 /// conventional NULL semantics of slots - it is a temporary hack which must be removed.
+
 class UnnestNode : public ExecNode {
  public:
-  UnnestNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  UnnestNode(ObjectPool* pool, const UnnestPlanNode& pnode, const DescriptorTbl& descs);
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
   virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
-  /// Initializes the expression which produces the collection to be unnested.
-  /// Called by the containing subplan node.
-  Status InitCollExpr(RuntimeState* state);
-
  private:
   friend class SubplanNode;
 
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 1b1b343..dde5b79 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -172,10 +172,13 @@ Status FragmentInstanceState::Prepare() {
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
 
+  PlanNode* plan_tree = nullptr;
+  RETURN_IF_ERROR(
+      PlanNode::CreateTree(runtime_state_, fragment_ctx_.fragment.plan, &plan_tree));
+  plan_tree_ = plan_tree;
   // set up plan
   RETURN_IF_ERROR(ExecNode::CreateTree(
-      runtime_state_, fragment_ctx_.fragment.plan, query_state_->desc_tbl(),
-      &exec_tree_));
+      runtime_state_, *plan_tree_, query_state_->desc_tbl(), &exec_tree_));
   runtime_state_->set_fragment_root_id(exec_tree_->id());
   if (instance_ctx_.__isset.debug_options) {
     ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_);
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index f3aa895..7b059c6 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -47,6 +47,7 @@ class TQueryCtx;
 class QueryState;
 class RuntimeProfile;
 class ExecNode;
+class PlanNode;
 class PlanRootSink;
 class Thread;
 class DataSink;
@@ -149,9 +150,10 @@ class FragmentInstanceState {
 
   /// All following member variables that are initialized to nullptr are set
   /// in Prepare().
-
   ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
+  /// Lives in obj_pool(). Not mutated after being initialized.
+  const PlanNode* plan_tree_ = nullptr;
 
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.