You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/07/12 04:20:10 UTC

[6/7] incubator-impala git commit: IMPALA-4862: make resource profile consistent with backend behaviour

IMPALA-4862: make resource profile consistent with backend behaviour

This moves away from the PipelinedPlanNodeSet approach of enumerating
sets of concurrently-executing nodes because unions would force
creating many overlapping sets of nodes. The new approach computes
the peak resources during Open() and the peak resources between Open()
and Close() (i.e. while calling GetNext()) bottom-up for each plan node
in a fragment. The fragment resources are then combined to produce the
query resources.

The basic assumptions for the new resource estimates are:
* resources are acquired during or after the first call to Open()
  and released in Close().
* Blocking nodes call Open() on their child before acquiring
  their own resources (this required some backend changes).
* Blocking nodes call Close() on their children before returning
  from Open().
* The peak resource consumption of the query is the sum of the
  independent fragments (except for the parallel join build plans
  where we can assume there will be synchronisation). This is
  conservative but we don't synchronise fragment Open() and Close()
  across exchanges so can't make stronger assumptions in general.

Also compute the sum of minimum reservations. This will be useful
in the backend to determine exactly when all of the initial
reservations have been claimed from a shared pool of initial reservations.

Testing:
* Updated planner tests to reflect behavioural changes.
* Added extra resource requirement planner tests for unions, subplans,
  pipelines of blocking operators, and bushy join plans.
* Added single-node plans to resource-requirements tests. These have
  more complex plan trees inside a single fragment, which is useful
  for testing the peak resource requirement logic.

Change-Id: I492cf5052bb27e4e335395e2a8f8a3b07248ec9d
Reviewed-on: http://gerrit.cloudera.org:8080/7223
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/64fd0115
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/64fd0115
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/64fd0115

Branch: refs/heads/master
Commit: 64fd0115e5691cfaebb730651df003ffee38fd8e
Parents: b329e62
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 7 16:25:26 2017 -0500
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jul 12 01:17:24 2017 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc               |   39 +-
 be/src/exec/blocking-join-node.h                |   45 +-
 be/src/exec/exec-node.h                         |    8 +
 be/src/exec/hash-join-node.cc                   |    4 -
 be/src/exec/partitioned-aggregation-node.cc     |    5 +-
 be/src/exec/partitioned-hash-join-node.cc       |   15 +-
 be/src/exec/partitioned-hash-join-node.h        |   25 +-
 be/src/exec/sort-node.cc                        |   14 +-
 be/src/util/backend-gflag-util.cc               |    2 +
 common/thrift/BackendGflags.thrift              |    2 +
 common/thrift/Frontend.thrift                   |   15 +-
 .../org/apache/impala/analysis/Analyzer.java    |    1 -
 .../org/apache/impala/catalog/HBaseTable.java   |    8 -
 .../java/org/apache/impala/common/TreeNode.java |   16 +
 .../apache/impala/planner/AggregationNode.java  |    4 +-
 .../apache/impala/planner/AnalyticEvalNode.java |    6 +-
 .../org/apache/impala/planner/DataSink.java     |    2 +-
 .../impala/planner/DataSourceScanNode.java      |    4 +-
 .../org/apache/impala/planner/EmptySetNode.java |    4 +-
 .../org/apache/impala/planner/ExchangeNode.java |   14 +-
 .../apache/impala/planner/HBaseScanNode.java    |   13 +-
 .../org/apache/impala/planner/HashJoinNode.java |    4 +-
 .../org/apache/impala/planner/HdfsScanNode.java |    6 +-
 .../org/apache/impala/planner/JoinNode.java     |   47 +
 .../org/apache/impala/planner/KuduScanNode.java |    4 +-
 .../impala/planner/NestedLoopJoinNode.java      |    4 +-
 .../apache/impala/planner/ParallelPlanner.java  |   17 +-
 .../impala/planner/PipelinedPlanNodeSet.java    |  204 --
 .../org/apache/impala/planner/PlanFragment.java |   82 +-
 .../org/apache/impala/planner/PlanNode.java     |   80 +-
 .../java/org/apache/impala/planner/Planner.java |   89 +-
 .../apache/impala/planner/ResourceProfile.java  |   39 +-
 .../org/apache/impala/planner/SelectNode.java   |    4 +-
 .../impala/planner/SingularRowSrcNode.java      |    4 +-
 .../org/apache/impala/planner/SortNode.java     |    6 +-
 .../org/apache/impala/planner/SubplanNode.java  |   21 +-
 .../org/apache/impala/planner/UnionNode.java    |   21 +-
 .../org/apache/impala/planner/UnnestNode.java   |    4 +-
 .../apache/impala/service/BackendConfig.java    |    4 +
 .../org/apache/impala/service/Frontend.java     |   11 +-
 .../java/org/apache/impala/util/BitUtil.java    |    4 +-
 .../java/org/apache/impala/util/MathUtil.java   |   45 +
 .../org/apache/impala/util/BitUtilTest.java     |   49 +
 .../org/apache/impala/util/MathUtilTest.java    |   61 +
 .../queries/PlannerTest/constant-folding.test   |   16 +-
 .../queries/PlannerTest/disable-codegen.test    |   18 +-
 .../PlannerTest/fk-pk-join-detection.test       |   15 +-
 .../queries/PlannerTest/kudu-selectivity.test   |   18 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   30 +-
 .../queries/PlannerTest/parquet-filtering.test  |    3 +
 .../PlannerTest/resource-requirements.test      | 2586 ++++++++++++++++--
 .../PlannerTest/sort-expr-materialization.test  |    8 +
 .../PlannerTest/spillable-buffer-sizing.test    |   78 +-
 .../queries/PlannerTest/tablesample.test        |   11 +
 .../queries/QueryTest/explain-level0.test       |    2 +-
 .../queries/QueryTest/explain-level1.test       |    2 +-
 .../queries/QueryTest/explain-level2.test       |    7 +-
 .../queries/QueryTest/explain-level3.test       |    5 +-
 .../queries/QueryTest/stats-extrapolation.test  |    5 +
 59 files changed, 3245 insertions(+), 615 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index cd3dd46..eeaccd6 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -144,21 +144,28 @@ void BlockingJoinNode::Close(RuntimeState* state) {
   ExecNode::Close(state);
 }
 
-void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink,
-    Status* status) {
+void BlockingJoinNode::ProcessBuildInputAsync(
+    RuntimeState* state, DataSink* build_sink, Status* status) {
   DCHECK(status != nullptr);
   SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
-  if (build_sink == nullptr){
-    *status = ProcessBuildInput(state);
-  } else {
-    *status = SendBuildInputToSink<true>(state, build_sink);
+  {
+    SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
+    *status = child(1)->Open(state);
+  }
+  if (status->ok()) *status = AcquireResourcesForBuild(state);
+  if (status->ok()) {
+    if (build_sink == nullptr){
+      *status = ProcessBuildInput(state);
+    } else {
+      *status = SendBuildInputToSink<true>(state, build_sink);
+    }
   }
   // IMPALA-1863: If the build-side thread failed, then we need to close the right
   // (build-side) child to avoid a potential deadlock between fragment instances.  This
   // is safe to do because while the build may have partially completed, it will not be
-  // probed.  BlockJoinNode::Open() will return failure as soon as child(0)->Open()
+  // probed. BlockingJoinNode::Open() will return failure as soon as child(0)->Open()
   // completes.
-  if (!status->ok()) child(1)->Close(state);
+  if (CanCloseBuildEarly() || !status->ok()) child(1)->Close(state);
   // Release the thread token as soon as possible (before the main thread joins
   // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
   // we'd keep the additional thread busy the whole time.
@@ -180,9 +187,9 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
   // Inside a subplan we expect Open() to be called a number of times proportional to the
   // input data of the SubplanNode, so we prefer doing processing the build input in the
   // main thread, assuming that thread creation is expensive relative to a single subplan
-  // iteration.
+  // iteration. TODO-MT: disable async build thread when mt_dop >= 1.
   //
-  // In this block, we also compute the 'overlap' time for the left and right child.  This
+  // In this block, we also compute the 'overlap' time for the left and right child. This
   // is the time (i.e. clock reads) when the right child stops overlapping with the left
   // child. For the single threaded case, the left and right child never overlap. For the
   // build side in a different thread, the overlap stops when the left child Open()
@@ -217,6 +224,8 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     // TODO: Remove this special-case behavior for subplans once we have proper
     // projection. See UnnestNode for details on the current projection implementation.
     RETURN_IF_ERROR(child(0)->Open(state));
+    RETURN_IF_ERROR(child(1)->Open(state));
+    RETURN_IF_ERROR(AcquireResourcesForBuild(state));
     if (build_sink == NULL) {
       RETURN_IF_ERROR(ProcessBuildInput(state));
     } else {
@@ -225,11 +234,16 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
   } else {
     // The left/right child never overlap. The overlap stops here.
     built_probe_overlap_stop_watch_.SetTimeCeiling();
+    // Open the build side before acquiring our own resources so that the build side
+    // can release any resources only used during its Open().
+    RETURN_IF_ERROR(child(1)->Open(state));
+    RETURN_IF_ERROR(AcquireResourcesForBuild(state));
     if (build_sink == NULL) {
       RETURN_IF_ERROR(ProcessBuildInput(state));
     } else {
       RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
     }
+    if (CanCloseBuildEarly()) child(1)->Close(state);
     RETURN_IF_ERROR(child(0)->Open(state));
   }
   return Status::OK();
@@ -259,11 +273,6 @@ template <bool ASYNC_BUILD>
 Status BlockingJoinNode::SendBuildInputToSink(RuntimeState* state,
     DataSink* build_sink) {
   {
-    CONDITIONAL_SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_, ASYNC_BUILD);
-    RETURN_IF_ERROR(child(1)->Open(state));
-  }
-
-  {
     SCOPED_TIMER(build_timer_);
     RETURN_IF_ERROR(build_sink->Open(state));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index a14d979..1972b34 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -101,27 +101,35 @@ class BlockingJoinNode : public ExecNode {
   /// with the probe child Open().
   MonotonicStopWatch built_probe_overlap_stop_watch_;
 
+  // True for a join node subclass if the build side can be closed before the probe
+  // side is opened. Should be true wherever possible to reduce resource consumption.
+  // E.g. this is true or PartitionedHashJoinNode because it rematerializes the build rows
+  // and false for NestedLoopJoinNode because it accumulates RowBatches that may reference
+  // memory still owned by the build-side ExecNode tree.
+  // Changes here must be kept in sync with the planner's resource profile computation.
+  // TODO: IMPALA-4179: this should always be true once resource transfer has been fixed.
+  virtual bool CanCloseBuildEarly() const { return false; }
+
+  /// Called by BlockingJoinNode after opening child(1) succeeds and before
+  /// SendBuildInputToSink is called to allocate resources for this ExecNode.
+  virtual Status AcquireResourcesForBuild(RuntimeState* state) { return Status::OK(); }
+
   /// Processes the build-side input.
   /// Called from ProcessBuildInputAndOpenProbe() if the subclass does not provide a
-  /// DataSink to consume the build input.
+  /// DataSink to consume the build input. The build-side input is already open when
+  /// this is called.
   /// Note that this can be called concurrently with Open'ing the left child to
   /// increase parallelism. If, for example, the left child is another join node,
   /// it can start its own build at the same time.
   /// TODO: move all subclasses to use the DataSink interface and remove this method.
   virtual Status ProcessBuildInput(RuntimeState* state) = 0;
 
-  /// Processes the build-side input and opens the probe side. Will do both concurrently
-  /// if the plan shape and thread token availability permit it.
-  /// If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'. Otherwise
-  /// calls ProcessBuildInput on the subclass.
+  /// Processes the build-side input, which should be already open, and opens the probe
+  /// side. Will do both concurrently if not in a subplan and an extra thread token is
+  /// available. If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'.
+  /// Otherwise calls ProcessBuildInput on the subclass.
   Status ProcessBuildInputAndOpenProbe(RuntimeState* state, DataSink* build_sink);
 
-  /// Helper function to process the build input by sending it to a DataSink.
-  /// ASYNC_BUILD enables timers that impose some overhead but are required if the build
-  /// is processed concurrently with the Open() of the left child.
-  template <bool ASYNC_BUILD>
-  Status SendBuildInputToSink(RuntimeState* state, DataSink* build_sink);
-
   /// Set up 'current_probe_row_' to point to the first input row from the left child
   /// (probe side). Fills 'probe_batch_' with rows from the left child and updates
   /// 'probe_batch_pos_' to the index of the row in 'probe_batch_' after
@@ -199,12 +207,19 @@ class BlockingJoinNode : public ExecNode {
       const MonotonicStopWatch* child_overlap_timer);
 
  private:
-  /// The main function for the thread that processes the build input asynchronously.
-  /// Its status is returned in the 'status' promise. If 'build_sink' is non-NULL, it
-  /// is used for the build. Otherwise, ProcessBuildInput() is called on the subclass.
+  /// Helper function to process the build input by sending it to a DataSink. The build
+  /// input must already be open before calling this. ASYNC_BUILD enables timers that
+  /// impose some overhead but are required if the build is processed concurrently with
+  /// the Open() of the left child.
+  template <bool ASYNC_BUILD>
+  Status SendBuildInputToSink(RuntimeState* state, DataSink* build_sink);
+
+  /// The main function for the thread that opens the build side and processes the build
+  /// input asynchronously.  Its status is returned in the 'status' promise. If
+  /// 'build_sink' is non-NULL, it is used for the build. Otherwise, ProcessBuildInput()
+  /// is called on the subclass.
   void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, Status* status);
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index b6dcd6e..a107f62 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -83,6 +83,14 @@ class ExecNode {
   /// If overridden in subclass, must first call superclass's Open().
   /// Open() is called after Prepare() or Reset(), i.e., possibly multiple times
   /// throughout the lifetime of this node.
+  ///
+  /// Memory resources must be acquired by an ExecNode only during or after the first
+  /// call to Open(). Blocking ExecNodes outside of a subplan must call Open() on their
+  /// child before acquiring their own resources to reduce the peak resource requirement.
+  /// This is particularly important if there are multiple blocking ExecNodes in a
+  /// pipeline because the lower nodes will release resources in Close() before the
+  /// Open() of their parent retuns.  The resource profile calculation in the frontend
+  /// relies on this when computing the peak resources required for a query.
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
 
   /// Retrieves rows and returns them via row_batch. Sets eos to true

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index deee7f2..c35d05d 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -223,10 +223,6 @@ Status HashJoinNode::ProcessBuildInput(RuntimeState* state) {
   // row ptrs.  The row ptrs are copied into the hash table's internal structure so they
   // don't need to be stored in the build_pool_.
   RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker());
-  {
-    SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-    RETURN_IF_ERROR(child(1)->Open(state));
-  }
   while (true) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 83ebbc2..e5ab0f3 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -263,6 +263,9 @@ void PartitionedAggregationNode::Codegen(RuntimeState* state) {
 
 Status PartitionedAggregationNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  // Open the child before consuming resources in this node.
+  RETURN_IF_ERROR(child(0)->Open(state));
+
   RETURN_IF_ERROR(ExecNode::Open(state));
   if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
   RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
@@ -278,8 +281,6 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
     RETURN_IF_ERROR(CreateHashPartitions(0));
   }
 
-  RETURN_IF_ERROR(children_[0]->Open(state));
-
   // Streaming preaggregations do all processing in GetNext().
   if (is_streaming_preagg_) return Status::OK();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index f931743..927bf1a 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -156,11 +156,6 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ht_ctx_->Open(state));
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state));
 
-  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-    RETURN_IF_ERROR(InitNullAwareProbePartition());
-    RETURN_IF_ERROR(InitNullProbeRows());
-  }
-
   // Check for errors and free local allocations before opening children.
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -181,6 +176,16 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
+Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
+  if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    // Initialize these partitions before doing the build so that the build does not
+    // use the reservation intended for them.
+    RETURN_IF_ERROR(InitNullAwareProbePartition());
+    RETURN_IF_ERROR(InitNullProbeRows());
+  }
+  return Status::OK();
+}
+
 Status PartitionedHashJoinNode::QueryMaintenance(RuntimeState* state) {
   // Build expressions may be evaluated during probing, so must be freed.
   // Probe side expr is not included in QueryMaintenance(). We cache the probe expression

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 68177ad..bf90ae4 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -108,18 +108,23 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
       const DescriptorTbl& descs);
   virtual ~PartitionedHashJoinNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual Status Reset(RuntimeState* state) override;
+  virtual void Close(RuntimeState* state) override;
 
  protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
-  virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
-  virtual Status ProcessBuildInput(RuntimeState* state);
+  virtual Status QueryMaintenance(RuntimeState* state) override;
+  virtual void AddToDebugString(
+      int indentation_level, std::stringstream* out) const override;
+  virtual Status ProcessBuildInput(RuntimeState* state) override;
+
+  // Safe to close the build side early because we rematerialize the build rows always.
+  virtual bool CanCloseBuildEarly() const override { return true; }
+  virtual Status AcquireResourcesForBuild(RuntimeState* state) override;
 
  private:
   class ProbePartition;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 552ee89..fd42124 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -69,20 +69,17 @@ void SortNode::Codegen(RuntimeState* state) {
 
 Status SortNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  // Open the child before consuming resources in this node.
+  RETURN_IF_ERROR(child(0)->Open(state));
   RETURN_IF_ERROR(ExecNode::Open(state));
   RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
   RETURN_IF_ERROR(sorter_->Open());
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
-  RETURN_IF_ERROR(child(0)->Open(state));
 
   // The child has been opened and the sorter created. Sort the input.
   // The final merge is done on-demand as rows are requested in GetNext().
   RETURN_IF_ERROR(SortInput(state));
-
-  // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
-  // again, the child can be closed at this point.
-  if (!IsInSubplan()) child(0)->Close(state);
   return Status::OK();
 }
 
@@ -163,12 +160,17 @@ Status SortNode::SortInput(RuntimeState* state) {
   RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
   bool eos;
   do {
-    batch.Reset();
     RETURN_IF_ERROR(child(0)->GetNext(state, &batch, &eos));
     RETURN_IF_ERROR(sorter_->AddBatch(&batch));
+    batch.Reset();
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
   } while(!eos);
+
+  // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
+  // again, the child can be closed at this point to release resources.
+  if (!IsInSubplan()) child(0)->Close(state);
+
   RETURN_IF_ERROR(sorter_->InputDone());
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 96c8f87..ab66a73 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -27,6 +27,7 @@
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
+DECLARE_bool(enable_partitioned_hash_join);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
@@ -71,6 +72,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_lineage_event_log_dir(FLAGS_lineage_event_log_dir);
   cfg.__set_local_library_path(FLAGS_local_library_dir);
   cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms);
+  cfg.__set_enable_partitioned_hash_join(FLAGS_enable_partitioned_hash_join);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index c76d8e0..3a410c2 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -56,4 +56,6 @@ struct TBackendGflags {
   16: required i32 kudu_operation_timeout_ms
 
   17: required i32 initial_hms_cnxn_timeout_s
+
+  18: required bool enable_partitioned_hash_join
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index e07fe84..79da0d6 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -389,14 +389,23 @@ struct TQueryExecRequest {
   // Estimated per-host peak memory consumption in bytes. Used for resource management.
   8: optional i64 per_host_mem_estimate
 
-  // Minimum buffer reservation required per host in bytes.
+  // Minimum query-wide buffer reservation required per host in bytes. This is the peak
+  // minimum reservation that may be required by the concurrently-executing operators at
+  // any point in query execution. It may be less than the initial reservation total
+  // claims (below) if execution of some operators never overlaps, which allows reuse of
+  // reservations.
   9: optional i64 per_host_min_reservation;
 
+  // Total of the initial buffer reservations that we expect to be claimed per host.
+  // I.e. the sum over all operators in all fragment instances that execute on that host.
+  // Measured in bytes.
+  10: optional i64 per_host_initial_reservation_total_claims;
+
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
-  10: required list<Types.TNetworkAddress> host_list
+  11: required list<Types.TNetworkAddress> host_list
 
   // Column lineage graph
-  11: optional LineageGraph.TLineageGraph lineage_graph
+  12: optional LineageGraph.TLineageGraph lineage_graph
 }
 
 enum TCatalogOpType {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 33fbb8b..365091d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index 4f60c96..43cf2b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -19,7 +19,6 @@ package org.apache.impala.catalog;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,10 +32,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -670,11 +667,6 @@ public class HBaseTable extends Table {
     return hbaseTableName_;
   }
 
-  public int getNumNodes() {
-    // TODO: implement
-    return 100;
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index 730fd44..b89f133 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -67,6 +67,22 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
   }
 
   /**
+   * Return list of all nodes of the tree rooted at 'this', obtained
+   * through post-order traversal.
+   */
+  public <C extends TreeNode<NodeType>> ArrayList<C> getNodesPostOrder() {
+    ArrayList<C> result = new ArrayList<C>();
+    getNodesPostOrderAux(result);
+    return result;
+  }
+
+  protected <C extends TreeNode<NodeType>> void getNodesPostOrderAux(
+      ArrayList<C> result) {
+    for (NodeType child: children_) child.getNodesPostOrderAux(result);
+    result.add((C) this);
+  }
+
+  /**
    * Count the total number of nodes in this tree. Leaf node will return 1.
    * Non-leaf node will include all its children.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index d1b7419..004c84e 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -281,7 +281,7 @@ public class AggregationNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling this method.");
     long perInstanceCardinality = fragment_.getPerInstanceNdv(
@@ -319,7 +319,7 @@ public class AggregationNode extends PlanNode {
       perInstanceMinBuffers = bufferSize * minBuffers;
     }
 
-    resourceProfile_ =
+    nodeResourceProfile_ =
         new ResourceProfile(perInstanceMemEstimate, perInstanceMinBuffers);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index f17226b..d4bafcf 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -100,7 +100,7 @@ public class AnalyticEvalNode extends PlanNode {
   }
 
   @Override
-  public boolean isBlockingNode() { return true; }
+  public boolean isBlockingNode() { return false; }
   public List<Expr> getPartitionExprs() { return partitionExprs_; }
   public List<OrderByElement> getOrderByElements() { return orderByElements_; }
 
@@ -242,7 +242,7 @@ public class AnalyticEvalNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling this method.");
     // TODO: come up with estimate based on window
@@ -250,6 +250,6 @@ public class AnalyticEvalNode extends PlanNode {
 
     // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in be.
     long perInstanceMinBufferBytes = 2 * getDefaultSpillableBufferBytes();
-    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes);
+    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/DataSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index 7fc0c83..6acb4b9 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -34,7 +34,7 @@ public abstract class DataSink {
 
   // resource requirements and estimates for this plan node.
   // set in computeResourceProfile()
-  protected ResourceProfile resourceProfile_ = null;
+  protected ResourceProfile resourceProfile_ = ResourceProfile.invalid();
 
   /**
    * Return an explain string for the DataSink. Each line of the explain will be prefixed

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 6dc8967..879d9d8 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -330,9 +330,9 @@ public class DataSourceScanNode extends ScanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    resourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0);
+    nodeResourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index 174051d..0d0acc9 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -60,9 +60,9 @@ public class EmptySetNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    resourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 3f4ea1e..478a054 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -17,9 +17,6 @@
 
 package org.apache.impala.planner;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.SortInfo;
@@ -185,9 +182,16 @@ public class ExchangeNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    resourceProfile_ =  new ResourceProfile(0, 0);
+    nodeResourceProfile_ =  new ResourceProfile(0, 0);
+  }
+
+  @Override
+  public ExecPhaseResourceProfiles computeTreeResourceProfiles(
+      TQueryOptions queryOptions) {
+    // Don't include resources of child in different plan fragment.
+    return new ExecPhaseResourceProfiles(nodeResourceProfile_, nodeResourceProfile_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 48b772a..bbecbf1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -54,6 +52,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.util.MembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -217,8 +216,10 @@ public class HBaseScanNode extends ScanNode {
       LOG.trace("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_));
     }
 
-    // TODO: take actual regions into account
-    numNodes_ = Math.max(1, tbl.getNumNodes());
+    // Assume that each node in the cluster gets a scan range, unless there are fewer
+    // scan ranges than nodes.
+    numNodes_ = Math.max(1,
+        Math.min(scanRanges_.size(), MembershipSnapshot.getCluster().numNodes()));
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_));
     }
@@ -494,9 +495,9 @@ public class HBaseScanNode extends ScanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    resourceProfile_ =  new ResourceProfile(1024L * 1024L * 1024L, 0);
+    nodeResourceProfile_ =  new ResourceProfile(1024L * 1024L * 1024L, 0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 48492b1..e828125 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -197,7 +197,7 @@ public class HashJoinNode extends JoinNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     long perInstanceMemEstimate;
     long perInstanceDataBytes;
     int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
@@ -234,6 +234,6 @@ public class HashJoinNode extends JoinNode {
     }
 
     long perInstanceMinBufferBytes = bufferSize * minBuffers;
-    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes);
+    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, perInstanceMinBufferBytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index fa9038a..0ba5bc6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1018,10 +1018,10 @@ public class HdfsScanNode extends ScanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan ranges.");
     if (scanRanges_.isEmpty()) {
-      resourceProfile_ = new ResourceProfile(0, 0);
+      nodeResourceProfile_ = new ResourceProfile(0, 0);
       return;
     }
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
@@ -1075,7 +1075,7 @@ public class HdfsScanNode extends ScanNode {
           PrintUtils.printBytes(perHostUpperBound)));
       perInstanceMemEstimate = perHostUpperBound;
     }
-    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 47fa3e5..5e0beb4 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -33,6 +33,8 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TJoinDistributionMode;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TQueryOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -630,4 +632,49 @@ public abstract class JoinNode extends PlanNode {
     eqJoinConjuncts_ = orderConjunctsByCost(eqJoinConjuncts_);
     otherJoinConjuncts_ = orderConjunctsByCost(otherJoinConjuncts_);
   }
+
+  @Override
+  public ExecPhaseResourceProfiles computeTreeResourceProfiles(
+      TQueryOptions queryOptions) {
+    Preconditions.checkState(isBlockingJoinNode(), "Only blocking join nodes supported");
+
+    ExecPhaseResourceProfiles buildSideProfile =
+        getChild(1).computeTreeResourceProfiles(queryOptions);
+    ExecPhaseResourceProfiles probeSideProfile =
+        getChild(0).computeTreeResourceProfiles(queryOptions);
+
+    // The peak resource consumption of the build phase is either during the Open() of
+    // the build side or while we're doing the join build and calling GetNext() on the
+    // build side.
+    ResourceProfile buildPhaseProfile = buildSideProfile.duringOpenProfile.max(
+        buildSideProfile.postOpenProfile.sum(nodeResourceProfile_));
+
+    ResourceProfile finishedBuildProfile = nodeResourceProfile_;
+    if (this instanceof NestedLoopJoinNode
+        || !BackendConfig.INSTANCE.isPartitionedHashJoinEnabled()) {
+      // These exec node implementations may hold references into the build side, which
+      // prevents closing of the build side in a timely manner. This means we have to
+      // count the post-open resource consumption of the build side in the same way as
+      // the other in-memory data structures.
+      // TODO: IMPALA-4179: remove this workaround
+      finishedBuildProfile = buildSideProfile.postOpenProfile.sum(nodeResourceProfile_);
+    }
+
+    // Peak resource consumption of this subtree during Open().
+    ResourceProfile duringOpenProfile;
+    if (queryOptions.getMt_dop() == 0) {
+      // The build and probe side can be open and therefore consume resources
+      // simultaneously when mt_dop = 0 because of the async build thread.
+      duringOpenProfile = buildPhaseProfile.sum(probeSideProfile.duringOpenProfile);
+    } else {
+      // Open() of the probe side happens after the build completes.
+      duringOpenProfile = buildPhaseProfile.max(
+          finishedBuildProfile.sum(probeSideProfile.duringOpenProfile));
+    }
+
+    // After Open(), the probe side remains open and the join build remain in memory.
+    ResourceProfile probePhaseProfile =
+        finishedBuildProfile.sum(probeSideProfile.postOpenProfile);
+    return new ExecPhaseResourceProfiles(duringOpenProfile, probePhaseProfile);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index b1aa5ba..4687129 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -268,8 +268,8 @@ public class KuduScanNode extends ScanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
-    resourceProfile_ = new ResourceProfile(0, 0);
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    nodeResourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index e69f97b..0ec8e4f 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -77,7 +77,7 @@ public class NestedLoopJoinNode extends JoinNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     long perInstanceMemEstimate;
     if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
         || numNodes_ == 0) {
@@ -86,7 +86,7 @@ public class NestedLoopJoinNode extends JoinNode {
       perInstanceMemEstimate =
           (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
     }
-    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java b/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
index f5cc5d8..979302b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/ParallelPlanner.java
@@ -81,18 +81,14 @@ public class ParallelPlanner {
     List<JoinNode> joins = Lists.newArrayList();
     collectJoins(fragment.getPlanRoot(), joins);
     if (!joins.isEmpty()) {
-      List<String> joinIds = Lists.newArrayList();
-      for (JoinNode join: joins) joinIds.add(join.getId().toString());
-
       if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
       for (JoinNode join: joins) createBuildPlan(join, buildCohortId);
     }
 
-    if (!fragment.getChildren().isEmpty()) {
-      List<String> ids = Lists.newArrayList();
-      for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString());
-    }
     for (PlanFragment child: fragment.getChildren()) {
+      // We already recursed on the join build fragment in createBuildPlan().
+      if (child.getSink() instanceof JoinBuildSink) continue;
+      // Propagate the plan and cohort IDs to children that are part of the same plan.
       child.setPlanId(fragment.getPlanId());
       child.setCohortId(fragment.getCohortId());
       createBuildPlans(child, buildCohortId);
@@ -171,19 +167,16 @@ public class ParallelPlanner {
         join.getChild(1), join.getFragment().getDataPartition());
     buildFragment.setSink(buildSink);
 
-    // move input fragments
+    // Fix up the child/parent relationships in the PlanFragment tree.
     for (int i = 0; i < exchNodes.size(); ++i) {
       Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment);
       join.getFragment().removeChild(inputFragments.get(i));
       buildFragment.getChildren().add(inputFragments.get(i));
     }
-
-    // compute the resource profile for the newly-added build sink.
-    buildSink.computeResourceProfile(ctx_.getQueryOptions());
+    join.getFragment().addChild(buildFragment);
 
     // assign plan and cohort id
     buildFragment.setPlanId(planIdGenerator_.getNextId());
-    PlanId parentPlanId = join.getFragment().getPlanId();
     buildFragment.setCohortId(cohortId);
 
     planRoots_.add(buildFragment);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java b/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
deleted file mode 100644
index c2ae0fd..0000000
--- a/fe/src/main/java/org/apache/impala/planner/PipelinedPlanNodeSet.java
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.impala.thrift.TQueryOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Represents a set of PlanNodes and DataSinks that execute and consume resources
- * concurrently. PlanNodes and DataSinks in such a pipelined plan node set may belong
- * to different plan fragments because data is streamed across fragments.
- *
- * For example, a series of left-deep joins consists of two plan node sets. The first
- * set contains all build-side nodes. The second set contains the leftmost
- * scan. Both sets contain all join nodes because they execute and consume
- * resources during the build and probe phases. Similarly, all nodes below a 'blocking'
- * node (e.g, an AggregationNode) are placed into a different plan node set than the
- * nodes above it, but the blocking node itself belongs to both sets.
- */
-public class PipelinedPlanNodeSet {
-  private final static Logger LOG = LoggerFactory.getLogger(PipelinedPlanNodeSet.class);
-
-  // Minimum per-host resource requirements to ensure that no plan node set can have
-  // estimates of zero, even if the contained PlanNodes have estimates of zero.
-  public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
-
-  // List of plan nodes that execute and consume resources concurrently.
-  private final ArrayList<PlanNode> planNodes_ = Lists.newArrayList();
-
-  // DataSinks that execute and consume resources concurrently.
-  // Primarily used for estimating the cost of insert queries.
-  private final List<DataSink> dataSinks_ = Lists.newArrayList();
-
-  private void addNode(PlanNode node) {
-    Preconditions.checkNotNull(node.getFragment());
-    planNodes_.add(node);
-  }
-
-  private void addSink(DataSink sink) {
-    Preconditions.checkNotNull(sink);
-    dataSinks_.add(sink);
-  }
-
-  /**
-   * Computes the per-host resource profile of this plan node set.
-   *
-   * If there are no nodes included in the estimate, the returned estimate will not be
-   * valid.
-   */
-  public ResourceProfile computePerHostResources(TQueryOptions queryOptions) {
-    Set<PlanFragment> uniqueFragments = Sets.newHashSet();
-
-    // Distinguish the per-host memory estimates for scan nodes and non-scan nodes to
-    // get a tighter estimate on the amount of memory required by multiple concurrent
-    // scans. The memory required by all concurrent scans of the same type (Hdfs/Hbase)
-    // cannot exceed the per-host upper memory bound for that scan type. Intuitively,
-    // the amount of I/O buffers is limited by the disk bandwidth.
-    long hbaseScanMemEstimate = 0L;
-    long hdfsScanMemEstimate = 0L;
-    long nonScanMemEstimate = 0L;
-    long minReservationBytes = 0L;
-    int numNodesIncluded = 0;
-
-    for (PlanNode node : planNodes_) {
-      PlanFragment fragment = node.getFragment();
-      // Multiple instances of a partitioned fragment may execute per host
-      int instancesPerHost = fragment.getNumInstancesPerHost(queryOptions.getMt_dop());
-
-      ResourceProfile nodeProfile = node.getResourceProfile();
-      Preconditions.checkState(nodeProfile.getMemEstimateBytes() >= 0);
-      long memEstimate = instancesPerHost * nodeProfile.getMemEstimateBytes();
-      ++numNodesIncluded;
-      uniqueFragments.add(fragment);
-      if (node instanceof HBaseScanNode) {
-        hbaseScanMemEstimate += memEstimate;
-      } else if (node instanceof HdfsScanNode) {
-        hdfsScanMemEstimate += memEstimate;
-      } else {
-        nonScanMemEstimate += memEstimate;
-      }
-      Preconditions.checkState(nodeProfile.getMinReservationBytes() >= 0);
-      minReservationBytes += instancesPerHost * nodeProfile.getMinReservationBytes();
-    }
-
-    if (queryOptions.getMt_dop() == 0) {
-      // The thread tokens for the non-MT path impose a limit on the memory that can
-      // be consumed by concurrent scans.
-      hbaseScanMemEstimate =
-          Math.min(hbaseScanMemEstimate, HBaseScanNode.getPerHostMemUpperBound());
-      hdfsScanMemEstimate =
-          Math.min(hdfsScanMemEstimate, HdfsScanNode.getPerHostMemUpperBound());
-    }
-
-    long dataSinkMemEstimate = 0L;
-    for (DataSink sink: dataSinks_) {
-      PlanFragment fragment = sink.getFragment();
-      // Sanity check that this plan-node set has at least one PlanNode of fragment.
-      Preconditions.checkState(uniqueFragments.contains(fragment));
-      int instancesPerHost = fragment.getNumInstancesPerHost(queryOptions.getMt_dop());
-
-      ResourceProfile sinkProfile = sink.getResourceProfile();
-      Preconditions.checkState(sinkProfile.getMemEstimateBytes() >= 0);
-      dataSinkMemEstimate += instancesPerHost * sinkProfile.getMemEstimateBytes();
-      Preconditions.checkState(sinkProfile.getMinReservationBytes() >= 0);
-      minReservationBytes += instancesPerHost * sinkProfile.getMinReservationBytes();
-    }
-
-    // Combine the memory estimates of all sinks, scans nodes and non-scan nodes.
-    long perHostMemEstimate =
-        Math.max(MIN_PER_HOST_MEM_ESTIMATE_BYTES, hdfsScanMemEstimate
-                + hbaseScanMemEstimate + nonScanMemEstimate + dataSinkMemEstimate);
-    // This plan node set might only have unpartitioned fragments and be invalid.
-    return numNodesIncluded > 0 ?
-        new ResourceProfile(perHostMemEstimate, minReservationBytes) :
-          ResourceProfile.invalid();
-  }
-
-  /**
-   * Computes and returns the pipelined plan node sets of the given plan.
-   */
-  public static ArrayList<PipelinedPlanNodeSet> computePlanNodeSets(PlanNode root) {
-    ArrayList<PipelinedPlanNodeSet> planNodeSets =
-        Lists.newArrayList(new PipelinedPlanNodeSet());
-    computePlanNodeSets(root, planNodeSets.get(0), null, planNodeSets);
-    return planNodeSets;
-  }
-
-  /**
-   * Populates 'planNodeSets' by recursively traversing the plan tree rooted at 'node'
-   * The plan node sets are computed top-down. As a result, the plan node sets are added
-   * in reverse order of their runtime execution.
-   *
-   * Nodes are generally added to lhsSet. Joins are treated specially in that their
-   * left child is added to lhsSet and their right child to rhsSet to make sure
-   * that concurrent join builds end up in the same plan node set.
-   */
-  private static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet,
-      PipelinedPlanNodeSet rhsSet, ArrayList<PipelinedPlanNodeSet> planNodeSets) {
-    lhsSet.addNode(node);
-    if (node == node.getFragment().getPlanRoot() && node.getFragment().hasSink()) {
-      lhsSet.addSink(node.getFragment().getSink());
-    }
-
-    if (node instanceof JoinNode && ((JoinNode)node).isBlockingJoinNode()) {
-      // Create a new set for the right-hand sides of joins if necessary.
-      if (rhsSet == null) {
-        rhsSet = new PipelinedPlanNodeSet();
-        planNodeSets.add(rhsSet);
-      }
-      // The join node itself is added to the lhsSet (above) and the rhsSet.
-      rhsSet.addNode(node);
-      computePlanNodeSets(node.getChild(1), rhsSet, null, planNodeSets);
-      computePlanNodeSets(node.getChild(0), lhsSet, rhsSet, planNodeSets);
-      return;
-    }
-
-    if (node.isBlockingNode()) {
-      // We add blocking nodes to two plan node sets because they require resources while
-      // consuming their input (execution of the preceding set) and while they
-      // emit their output (execution of the following set).
-      // TODO: IMPALA-4862: this logic does not accurately reflect the behaviour of
-      // concurrent join builds in the backend
-      lhsSet = new PipelinedPlanNodeSet();
-      lhsSet.addNode(node);
-      planNodeSets.add(lhsSet);
-      // Join builds under this blocking node belong in a new rhsSet.
-      rhsSet = null;
-    }
-
-    // Assume that non-join, non-blocking nodes with multiple children
-    // (e.g., ExchangeNodes) consume their inputs in an arbitrary order,
-    // i.e., all child subtrees execute concurrently.
-    // TODO: IMPALA-4862: can overestimate resource consumption of UnionNodes - the
-    // execution of union branches is serialised within a fragment (but not across
-    // fragment boundaries).
-    for (PlanNode child: node.getChildren()) {
-      computePlanNodeSets(child, lhsSet, rhsSet, planNodeSets);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 79c953a..05bcf25 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -28,6 +28,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.planner.PlanNode.ExecPhaseResourceProfiles;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.thrift.TPlanFragment;
@@ -104,6 +105,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // if the output is UNPARTITIONED, it is being broadcast
   private DataPartition outputPartition_;
 
+  // Resource requirements and estimates for all instances of this plan fragment running
+  // on a host. Initialized with a dummy value. Gets set correctly in
+  // computeResourceProfile().
+  private ResourceProfile perHostResourceProfile_ = ResourceProfile.invalid();
+
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
    */
@@ -128,11 +134,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   }
 
   /**
-   * Collect all PlanNodes that belong to the exec tree of this fragment.
+   * Collect and return all PlanNodes that belong to the exec tree of this fragment.
    */
-  public void collectPlanNodes(List<PlanNode> nodes) {
-    Preconditions.checkNotNull(nodes);
+  public List<PlanNode> collectPlanNodes() {
+    List<PlanNode> nodes = Lists.newArrayList();
     collectPlanNodesHelper(planRoot_, nodes);
+    return nodes;
   }
 
   private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) {
@@ -148,14 +155,13 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   public List<Expr> getOutputExprs() { return outputExprs_; }
 
   /**
-   * Finalize plan tree and create stream sink, if needed.
-   * Computes resource profiles for all nodes and sinks in this fragment.
+   * Do any final work to set up the ExchangeNodes and DataStreamSinks for this fragment.
    * If this fragment is hash partitioned, ensures that the corresponding partition
    * exprs of all hash-partitioning senders are cast to identical types.
    * Otherwise, the hashes generated for identical partition values may differ
    * among senders if the partition-expr types are not identical.
    */
-  public void finalize(Analyzer analyzer)
+  public void finalizeExchanges(Analyzer analyzer)
       throws InternalException, NotImplementedException {
     if (destNode_ != null) {
       Preconditions.checkState(sink_ == null);
@@ -164,7 +170,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       streamSink.setFragment(this);
       sink_ = streamSink;
     }
-    computeResourceProfile(analyzer);
 
     if (!dataPartition_.isHashPartitioned()) return;
 
@@ -202,16 +207,38 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   }
 
   /**
-   * Compute the resource profile of the fragment. Must be called after all the
-   * plan nodes and sinks are added to the fragment.
+   * Compute the peak resource profile for all instances of this fragment per host. Must
+   * be called after all the plan nodes and sinks are added to the fragment and resource
+   * profiles of all children fragments are computed.
    */
-  private void computeResourceProfile(Analyzer analyzer) {
+  public void computeResourceProfile(Analyzer analyzer) {
+    // Compute resource profiles for all plan nodes and sinks in the fragment.
     sink_.computeResourceProfile(analyzer.getQueryOptions());
-    List<PlanNode> nodes = Lists.newArrayList();
-    collectPlanNodes(nodes);
-    for (PlanNode node: nodes) {
-      node.computeResourceProfile(analyzer.getQueryOptions());
+    for (PlanNode node: collectPlanNodes()) {
+      node.computeNodeResourceProfile(analyzer.getQueryOptions());
+    }
+
+    if (sink_ instanceof JoinBuildSink) {
+      // Resource consumption of fragments with join build sinks is included in the
+      // parent fragment because the join node blocks waiting for the join build to
+      // finish - see JoinNode.computeTreeResourceProfiles().
+      perHostResourceProfile_ = ResourceProfile.invalid();
+      return;
     }
+
+    ExecPhaseResourceProfiles planTreeProfile =
+        planRoot_.computeTreeResourceProfiles(analyzer.getQueryOptions());
+    // The sink is opened after the plan tree.
+    ResourceProfile fInstancePostOpenProfile =
+        planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile());
+    ResourceProfile fInstanceProfile =
+        planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile);
+    int numInstances = getNumInstancesPerHost(analyzer.getQueryOptions().getMt_dop());
+    perHostResourceProfile_ = fInstanceProfile.multiply(numInstances);
+  }
+
+  public ResourceProfile getPerHostResourceProfile() {
+    return perHostResourceProfile_;
   }
 
   /**
@@ -313,17 +340,15 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       prefix = "  ";
       rootPrefix = "  ";
       detailPrefix = prefix + "|  ";
-      str.append(getFragmentHeaderString(queryOptions.getMt_dop()));
-      str.append("\n");
+      str.append(getFragmentHeaderString("", "", queryOptions.getMt_dop()));
       if (sink_ != null && sink_ instanceof DataStreamSink) {
         str.append(
             sink_.getExplainString(rootPrefix, detailPrefix, queryOptions, detailLevel));
       }
     } else if (detailLevel == TExplainLevel.EXTENDED) {
       // Print a fragment prefix displaying the # nodes and # instances
-      str.append(rootPrefix);
-      str.append(getFragmentHeaderString(queryOptions.getMt_dop()));
-      str.append("\n");
+      str.append(
+          getFragmentHeaderString(rootPrefix, detailPrefix, queryOptions.getMt_dop()));
       rootPrefix = prefix;
     }
 
@@ -348,12 +373,22 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   /**
    * Get a header string for a fragment in an explain plan.
    */
-  public String getFragmentHeaderString(int mt_dop) {
+  public String getFragmentHeaderString(String firstLinePrefix, String detailPrefix,
+      int mt_dop) {
     StringBuilder builder = new StringBuilder();
-    builder.append(String.format("%s:PLAN FRAGMENT [%s]", fragmentId_.toString(),
-        dataPartition_.getExplainString()));
+    builder.append(String.format("%s%s:PLAN FRAGMENT [%s]", firstLinePrefix,
+        fragmentId_.toString(), dataPartition_.getExplainString()));
     builder.append(PrintUtils.printNumHosts(" ", getNumNodes()));
     builder.append(PrintUtils.printNumInstances(" ", getNumInstances(mt_dop)));
+    builder.append("\n");
+    builder.append(detailPrefix);
+    builder.append("Per-Host Resources: ");
+    if (sink_ instanceof JoinBuildSink) {
+      builder.append("included in parent fragment");
+    } else {
+      builder.append(perHostResourceProfile_.getExplainString());
+    }
+    builder.append("\n");
     return builder.toString();
   }
 
@@ -419,8 +454,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
    */
   public void verifyTree() {
     // PlanNode.fragment_ is set correctly
-    List<PlanNode> nodes = Lists.newArrayList();
-    collectPlanNodes(nodes);
+    List<PlanNode> nodes = collectPlanNodes();
     List<PlanNode> exchNodes = Lists.newArrayList();
     for (PlanNode node: nodes) {
       if (node instanceof ExchangeNode) exchNodes.add(node);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 8448da5..9723c4a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -115,7 +115,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // resource requirements and estimates for this plan node.
   // Initialized with a dummy value. Gets set correctly in
   // computeResourceProfile().
-  protected ResourceProfile resourceProfile_ = ResourceProfile.invalid();
+  protected ResourceProfile nodeResourceProfile_ = ResourceProfile.invalid();
 
   // sum of tupleIds_' avgSerializedSizes; set in computeStats()
   protected float avgRowSize_;
@@ -192,7 +192,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public boolean hasLimit() { return limit_ > -1; }
   public long getCardinality() { return cardinality_; }
   public int getNumNodes() { return numNodes_; }
-  public ResourceProfile getResourceProfile() { return resourceProfile_; }
+  public ResourceProfile getNodeResourceProfile() { return nodeResourceProfile_; }
   public float getAvgRowSize() { return avgRowSize_; }
   public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
   public PlanFragment getFragment() { return fragment_; }
@@ -306,7 +306,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       // Print resource profile.
       expBuilder.append(detailPrefix);
-      expBuilder.append(resourceProfile_.getExplainString());
+      expBuilder.append(nodeResourceProfile_.getExplainString());
       expBuilder.append("\n");
 
       // Print tuple ids, row size and cardinality.
@@ -344,10 +344,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       PlanFragment childFragment = children_.get(0).fragment_;
       if (fragment_ != childFragment && detailLevel == TExplainLevel.EXTENDED) {
         // we're crossing a fragment boundary - print the fragment header.
-        expBuilder.append(prefix);
-        expBuilder.append(
-            childFragment.getFragmentHeaderString(queryOptions.getMt_dop()));
-        expBuilder.append("\n");
+        expBuilder.append(childFragment.getFragmentHeaderString(prefix, prefix,
+            queryOptions.getMt_dop()));
       }
       expBuilder.append(
           children_.get(0).getExplainString(prefix, prefix, queryOptions, detailLevel));
@@ -390,7 +388,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
     TExecStats estimatedStats = new TExecStats();
     estimatedStats.setCardinality(cardinality_);
-    estimatedStats.setMemory_used(resourceProfile_.getMemEstimateBytes());
+    estimatedStats.setMemory_used(nodeResourceProfile_.getMemEstimateBytes());
     msg.setLabel(getDisplayLabel());
     msg.setLabel_detail(getDisplayLabelDetail());
     msg.setEstimated_stats(estimatedStats);
@@ -616,15 +614,71 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public boolean isBlockingNode() { return false; }
 
   /**
-   * Compute resources consumed when executing this PlanNode, initializing
-   * 'resource_profile_'. May only be called after this PlanNode has been placed in a
-   * PlanFragment because the cost computation is dependent on the enclosing fragment's
+   * Compute peak resources consumed when executing this PlanNode, initializing
+   * 'nodeResourceProfile_'. May only be called after this PlanNode has been placed in
+   * a PlanFragment because the cost computation is dependent on the enclosing fragment's
    * data partition.
    */
-  public abstract void computeResourceProfile(TQueryOptions queryOptions);
+  public abstract void computeNodeResourceProfile(TQueryOptions queryOptions);
 
   /**
-   * The default size of buffer used in spilling nodes. Used in computeResourceProfile().
+   * Wrapper class to represent resource profiles during different phases of execution.
+   */
+  public static class ExecPhaseResourceProfiles {
+    public ExecPhaseResourceProfiles(
+        ResourceProfile duringOpenProfile, ResourceProfile postOpenProfile) {
+      this.duringOpenProfile = duringOpenProfile;
+      this.postOpenProfile = postOpenProfile;
+    }
+
+    /** Peak resources consumed while Open() is executing for this subtree */
+    public final ResourceProfile duringOpenProfile;
+
+    /**
+     * Peak resources consumed for this subtree from the time when ExecNode::Open()
+     * returns until the time when ExecNode::Close() returns.
+     */
+    public final ResourceProfile postOpenProfile;
+  }
+
+  /**
+   * Recursive function used to compute the peak resources consumed by this subtree of
+   * the plan within a fragment instance. The default implementation of this function
+   * is correct for streaming and blocking PlanNodes with a single child. PlanNodes
+   * that don't meet this description must override this function.
+   *
+   * Not called for PlanNodes inside a subplan: the root SubplanNode is responsible for
+   * computing the peak resources for the entire subplan.
+   *
+   * computeNodeResourceProfile() must be called on all plan nodes in this subtree before
+   * calling this function.
+   */
+  public ExecPhaseResourceProfiles computeTreeResourceProfiles(
+      TQueryOptions queryOptions) {
+    Preconditions.checkState(
+        children_.size() <= 1, "Plan nodes with > 1 child must override");
+    if (children_.isEmpty()) {
+      return new ExecPhaseResourceProfiles(nodeResourceProfile_, nodeResourceProfile_);
+    }
+    ExecPhaseResourceProfiles childResources =
+        getChild(0).computeTreeResourceProfiles(queryOptions);
+    if (isBlockingNode()) {
+      // This does not consume resources until after child's Open() returns. The child is
+      // then closed before Open() of this node returns.
+      ResourceProfile duringOpenProfile = childResources.duringOpenProfile.max(
+          childResources.postOpenProfile.sum(nodeResourceProfile_));
+      return new ExecPhaseResourceProfiles(duringOpenProfile, nodeResourceProfile_);
+    } else {
+      // Streaming node: this node, child and ancestor execute concurrently.
+      return new ExecPhaseResourceProfiles(
+          childResources.duringOpenProfile.sum(nodeResourceProfile_),
+          childResources.postOpenProfile.sum(nodeResourceProfile_));
+    }
+  }
+
+  /**
+   * The default size of buffer used in spilling nodes. Used in
+   * computeNodeResourceProfile().
    */
   protected final static long getDefaultSpillableBufferBytes() {
     // BufferedBlockMgr uses --read_size to determine buffer size.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 936847a..c65c668 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -58,6 +58,13 @@ import com.google.common.collect.Lists;
 public class Planner {
   private final static Logger LOG = LoggerFactory.getLogger(Planner.class);
 
+  // Minimum per-host resource requirements to ensure that no plan node set can have
+  // estimates of zero, even if the contained PlanNodes have estimates of zero.
+  public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
+
+  public static final ResourceProfile MIN_PER_HOST_RESOURCES =
+      new ResourceProfile(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0);
+
   private final PlannerContext ctx_;
 
   public Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx) {
@@ -159,7 +166,7 @@ public class Planner {
       LOG.trace("finalize plan fragments");
     }
     for (PlanFragment fragment: fragments) {
-      fragment.finalize(ctx_.getRootAnalyzer());
+      fragment.finalizeExchanges(ctx_.getRootAnalyzer());
     }
 
     Collections.reverse(fragments);
@@ -332,49 +339,67 @@ public class Planner {
   }
 
   /**
-   * Estimates the per-host resource requirements for the given plans, and sets the
-   * results in request.
-   * TODO: The LOG.warn() messages should eventually become Preconditions checks
-   * once resource estimation is more robust.
+   * Computes the per-host resource profile for the given plans, i.e. the peak resources
+   * consumed by all fragment instances belonging to the query per host. Sets the
+   * per-host resource values in 'request'.
    */
   public void computeResourceReqs(List<PlanFragment> planRoots,
       TQueryExecRequest request) {
     Preconditions.checkState(!planRoots.isEmpty());
     Preconditions.checkNotNull(request);
-
-    // Compute the sum over all plans.
-    // TODO: Revisit during MT work - scheduling of fragments will change and computing
-    // the sum may not be correct or optimal.
-    ResourceProfile totalResources = ResourceProfile.invalid();
-    for (PlanFragment planRoot: planRoots) {
-      ResourceProfile planMaxResources = ResourceProfile.invalid();
-      ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
-      // Compute pipelined plan node sets.
-      ArrayList<PipelinedPlanNodeSet> planNodeSets =
-          PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
-
-      // Compute the max of the per-host resources requirement.
-      // Note that the different maxes may come from different plan node sets.
-      for (PipelinedPlanNodeSet planNodeSet : planNodeSets) {
-        TQueryOptions queryOptions = ctx_.getQueryOptions();
-        ResourceProfile perHostResources =
-            planNodeSet.computePerHostResources(queryOptions);
-        if (!perHostResources.isValid()) continue;
-        planMaxResources = ResourceProfile.max(planMaxResources, perHostResources);
+    TQueryOptions queryOptions = ctx_.getRootAnalyzer().getQueryOptions();
+    int mtDop = queryOptions.getMt_dop();
+
+    // Peak per-host peak resources for all plan fragments.
+    ResourceProfile perHostPeakResources = ResourceProfile.invalid();
+    // Total of initial reservation claims in bytes by all operators in all fragment
+    // instances per host. Computed by summing the per-host minimum reservations of
+    // all plan nodes and sinks.
+    long perHostInitialReservationTotal = 0;
+
+    // Do a pass over all the fragments to compute resource profiles. Compute the
+    // profiles bottom-up since a fragment's profile may depend on its descendants.
+    List<PlanFragment> allFragments = planRoots.get(0).getNodesPostOrder();
+    for (PlanFragment fragment: allFragments) {
+      // Compute the per-node, per-sink and aggregate profiles for the fragment.
+      fragment.computeResourceProfile(ctx_.getRootAnalyzer());
+
+      // Different fragments do not synchronize their Open() and Close(), so the backend
+      // does not provide strong guarantees about whether one fragment instance releases
+      // resources before another acquires them. Conservatively assume that all fragment
+      // instances can consume their peak resources at the same time, i.e. that the
+      // query-wide peak resources is the sum of the per-fragment-instance peak
+      // resources.
+      perHostPeakResources =
+          perHostPeakResources.sum(fragment.getPerHostResourceProfile());
+      perHostInitialReservationTotal += fragment.getNumInstancesPerHost(mtDop)
+          * fragment.getSink().getResourceProfile().getMinReservationBytes();
+
+      for (PlanNode node: fragment.collectPlanNodes()) {
+        perHostInitialReservationTotal += fragment.getNumInstances(mtDop)
+            * node.getNodeResourceProfile().getMinReservationBytes();
       }
-      totalResources = ResourceProfile.sum(totalResources, planMaxResources);
     }
 
-    Preconditions.checkState(totalResources.getMemEstimateBytes() >= 0);
-    Preconditions.checkState(totalResources.getMinReservationBytes() >= 0);
-    request.setPer_host_mem_estimate(totalResources.getMemEstimateBytes());
-    request.setPer_host_min_reservation(totalResources.getMinReservationBytes());
+    Preconditions.checkState(perHostPeakResources.getMemEstimateBytes() >= 0,
+        perHostPeakResources.getMemEstimateBytes());
+    Preconditions.checkState(perHostPeakResources.getMinReservationBytes() >= 0,
+        perHostPeakResources.getMinReservationBytes());
+
+    perHostPeakResources = MIN_PER_HOST_RESOURCES.max(perHostPeakResources);
+
+    request.setPer_host_mem_estimate(perHostPeakResources.getMemEstimateBytes());
+    request.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes());
+    request.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Per-host min buffer : " + totalResources.getMinReservationBytes());
-      LOG.trace("Estimated per-host memory: " + totalResources.getMemEstimateBytes());
+      LOG.trace("Per-host min buffer : " + perHostPeakResources.getMinReservationBytes());
+      LOG.trace(
+          "Estimated per-host memory: " + perHostPeakResources.getMemEstimateBytes());
+      LOG.trace("Per-host initial reservation total: " + perHostInitialReservationTotal);
     }
   }
 
+
   /**
    * Traverses the plan tree rooted at 'root' and inverts outer and semi joins
    * in the following situations:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
index c0dc607..18cde7e 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -18,9 +18,11 @@
 package org.apache.impala.planner;
 
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.util.MathUtil;
 
 /**
- * The resources that will be consumed by a set of plan nodes.
+ * The resources that will be consumed by some part of a plan, e.g. a plan node or
+ * plan fragment.
  */
 public class ResourceProfile {
   // If the computed values are valid.
@@ -64,20 +66,29 @@ public class ResourceProfile {
     return output.toString();
   }
 
-  // Returns a profile with the max of each value in 'p1' and 'p2'.
-  public static ResourceProfile max(ResourceProfile p1, ResourceProfile p2) {
-    if (!p1.isValid()) return p2;
-    if (!p2.isValid()) return p1;
+  // Returns a profile with the max of each value in 'this' and 'other'.
+  public ResourceProfile max(ResourceProfile other) {
+    if (!isValid()) return other;
+    if (!other.isValid()) return this;
     return new ResourceProfile(
-        Math.max(p1.getMemEstimateBytes(), p2.getMemEstimateBytes()),
-        Math.max(p1.getMinReservationBytes(), p2.getMinReservationBytes()));
+        Math.max(getMemEstimateBytes(), other.getMemEstimateBytes()),
+        Math.max(getMinReservationBytes(), other.getMinReservationBytes()));
   }
 
-  // Returns a profile with the sum of each value in 'p1' and 'p2'.
-  public static ResourceProfile sum(ResourceProfile p1, ResourceProfile p2) {
-    if (!p1.isValid()) return p2;
-    if (!p2.isValid()) return p1;
-    return new ResourceProfile(p1.getMemEstimateBytes() + p2.getMemEstimateBytes(),
-        p1.getMinReservationBytes() + p2.getMinReservationBytes());
+  // Returns a profile with the sum of each value in 'this' and 'other'.
+  public ResourceProfile sum(ResourceProfile other) {
+    if (!isValid()) return other;
+    if (!other.isValid()) return this;
+    return new ResourceProfile(
+        MathUtil.saturatingAdd(getMemEstimateBytes(), other.getMemEstimateBytes()),
+        MathUtil.saturatingAdd(getMinReservationBytes(), other.getMinReservationBytes()));
+  }
+
+  // Returns a profile with all values multiplied by 'factor'.
+  public ResourceProfile multiply(int factor) {
+    if (!isValid()) return this;
+    return new ResourceProfile(
+        MathUtil.saturatingMultiply(memEstimateBytes_, factor),
+        MathUtil.saturatingMultiply(minReservationBytes_, factor));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index c346df9..97dfa5b 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -82,9 +82,9 @@ public class SelectNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    resourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index 82a1c41..bed1c9a 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -66,9 +66,9 @@ public class SingularRowSrcNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    resourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = new ResourceProfile(0, 0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/64fd0115/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 3517bee..f628885 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -216,12 +216,12 @@ public class SortNode extends PlanNode {
   }
 
   @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
     if (useTopN_) {
       long perInstanceMemEstimate =
               (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
-      resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+      nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
       return;
     }
 
@@ -253,7 +253,7 @@ public class SortNode extends PlanNode {
     if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
       perInstanceMinReservation *= 2;
     }
-    resourceProfile_ =
+    nodeResourceProfile_ =
         new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
   }