You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/04/19 01:20:32 UTC

[5/6] incubator-impala git commit: IMPALA-3748: minimum buffer requirements in planner

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 978e0ac..d5c72a0 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -65,6 +65,10 @@ import com.google.common.math.LongMath;
 abstract public class PlanNode extends TreeNode<PlanNode> {
   private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
 
+  // The size of buffer used in spilling nodes. Used in computeResourceProfile().
+  // TODO: IMPALA-3200: get from query option
+  protected final static long SPILLABLE_BUFFER_BYTES = 8L * 1024L * 1024L;
+
   // String used for this node in getExplainString().
   protected String displayName_;
 
@@ -110,13 +114,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // set in computeStats(); invalid: -1
   protected int numNodes_;
 
+  // resource requirements and estimates for this plan node.
+  // set in computeResourceProfile().
+  protected ResourceProfile resourceProfile_ = null;
+
   // sum of tupleIds_' avgSerializedSizes; set in computeStats()
   protected float avgRowSize_;
 
-  // estimated per-host memory requirement for this node;
-  // set in computeCosts(); invalid: -1
-  protected long perHostMemCost_ = -1;
-
   // If true, disable codegen for this plan node.
   protected boolean disableCodegen_;
 
@@ -187,9 +191,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   }
   public long getLimit() { return limit_; }
   public boolean hasLimit() { return limit_ > -1; }
-  public long getPerHostMemCost() { return perHostMemCost_; }
   public long getCardinality() { return cardinality_; }
   public int getNumNodes() { return numNodes_; }
+  public ResourceProfile getResourceProfile() { return resourceProfile_; }
   public float getAvgRowSize() { return avgRowSize_; }
   public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
   public PlanFragment getFragment() { return fragment_; }
@@ -235,8 +239,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     conjuncts_.clear();
   }
 
-  public String getExplainString() {
-    return getExplainString("", "", TExplainLevel.VERBOSE);
+  public String getExplainString(TQueryOptions queryOptions) {
+    return getExplainString("", "", queryOptions, TExplainLevel.VERBOSE);
   }
 
   protected void setDisplayName(String s) { displayName_ = s; }
@@ -269,7 +273,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
    * output will be prefixed by prefix.
    */
   protected final String getExplainString(String rootPrefix, String prefix,
-      TExplainLevel detailLevel) {
+      TQueryOptions queryOptions, TExplainLevel detailLevel) {
     StringBuilder expBuilder = new StringBuilder();
     String detailPrefix = prefix;
     String filler;
@@ -302,11 +306,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     // Output cardinality, cost estimates and tuple Ids only when explain plan level
     // is extended or above.
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      // Print estimated output cardinality and memory cost.
-      expBuilder.append(PrintUtils.printHosts(detailPrefix, numNodes_));
-      expBuilder.append(PrintUtils.printMemCost(" ", perHostMemCost_) + "\n");
+      // Print resource profile.
+      expBuilder.append(detailPrefix);
+      expBuilder.append(resourceProfile_.getExplainString());
+      expBuilder.append("\n");
 
-      // Print tuple ids and row size.
+      // Print tuple ids, row size and cardinality.
       expBuilder.append(detailPrefix + "tuple-ids=");
       for (int i = 0; i < tupleIds_.size(); ++i) {
         TupleId tupleId = tupleIds_.get(i);
@@ -331,15 +336,23 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
           // we're crossing a fragment boundary
           expBuilder.append(
               child.fragment_.getExplainString(
-                childHeadlinePrefix, childDetailPrefix, detailLevel));
+                childHeadlinePrefix, childDetailPrefix, queryOptions, detailLevel));
         } else {
-          expBuilder.append(
-              child.getExplainString(childHeadlinePrefix, childDetailPrefix,
-                  detailLevel));
+          expBuilder.append(child.getExplainString(childHeadlinePrefix,
+              childDetailPrefix, queryOptions, detailLevel));
         }
         if (printFiller) expBuilder.append(filler + "\n");
       }
-      expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel));
+      PlanFragment childFragment = children_.get(0).fragment_;
+      if (fragment_ != childFragment && detailLevel == TExplainLevel.EXTENDED) {
+        // we're crossing a fragment boundary - print the fragment header.
+        expBuilder.append(prefix);
+        expBuilder.append(
+            childFragment.getFragmentHeaderString(queryOptions.getMt_dop()));
+        expBuilder.append("\n");
+      }
+      expBuilder.append(
+          children_.get(0).getExplainString(prefix, prefix, queryOptions, detailLevel));
     }
     return expBuilder.toString();
   }
@@ -379,7 +392,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
     TExecStats estimatedStats = new TExecStats();
     estimatedStats.setCardinality(cardinality_);
-    estimatedStats.setMemory_used(perHostMemCost_);
+    estimatedStats.setMemory_used(resourceProfile_.getMemEstimateBytes());
     msg.setLabel(getDisplayLabel());
     msg.setLabel_detail(getDisplayLabelDetail());
     msg.setEstimated_stats(estimatedStats);
@@ -605,13 +618,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public boolean isBlockingNode() { return false; }
 
   /**
-   * Estimates the cost of executing this PlanNode. Currently only sets perHostMemCost_.
-   * May only be called after this PlanNode has been placed in a PlanFragment because
-   * the cost computation is dependent on the enclosing fragment's data partition.
+   * Compute resources consumed when executing this PlanNode, initializing
+   * 'resource_profile_'. May only be called after this PlanNode has been placed in a
+   * PlanFragment because the cost computation is dependent on the enclosing fragment's
+   * data partition.
    */
-  public void computeCosts(TQueryOptions queryOptions) {
-    perHostMemCost_ = 0;
-  }
+  public abstract void computeResourceProfile(TQueryOptions queryOptions);
 
   /**
    * The input cardinality is the sum of output cardinalities of its children.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index a199f54..fba9149 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
 
 /**
  * Sink for the root of a query plan that produces result rows. Allows coordination
@@ -28,9 +29,15 @@ import org.apache.impala.thrift.TExplainLevel;
  */
 public class PlanRootSink extends DataSink {
 
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    return String.format("%sPLAN-ROOT SINK\n", prefix);
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
+    output.append(String.format("%sPLAN-ROOT SINK\n", prefix));
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add a memory estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
   }
 
   protected TDataSink toThrift() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 8842c9c..ad5bba5 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -40,6 +40,7 @@ import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
@@ -266,11 +267,14 @@ public class Planner {
       TQueryExecRequest request, TExplainLevel explainLevel) {
     StringBuilder str = new StringBuilder();
     boolean hasHeader = false;
-    if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) {
-      str.append(
-          String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n",
-          PrintUtils.printBytes(request.getPer_host_mem_req()),
-          request.per_host_vcores));
+    if (request.isSetPer_host_min_reservation()) {
+      str.append(String.format("Per-Host Resource Reservation: Memory=%s\n",
+              PrintUtils.printBytes(request.getPer_host_min_reservation()))) ;
+      hasHeader = true;
+    }
+    if (request.isSetPer_host_mem_estimate()) {
+      str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
+          PrintUtils.printBytes(request.getPer_host_mem_estimate())));
       hasHeader = true;
     }
 
@@ -324,12 +328,12 @@ public class Planner {
 
     if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
       // Print the non-fragmented parallel plan.
-      str.append(fragments.get(0).getExplainString(explainLevel));
+      str.append(fragments.get(0).getExplainString(ctx_.getQueryOptions(), explainLevel));
     } else {
       // Print the fragmented parallel plan.
       for (int i = 0; i < fragments.size(); ++i) {
         PlanFragment fragment = fragments.get(i);
-        str.append(fragment.getExplainString(explainLevel));
+        str.append(fragment.getExplainString(ctx_.getQueryOptions(), explainLevel));
         if (i < fragments.size() - 1) str.append("\n");
       }
     }
@@ -337,91 +341,46 @@ public class Planner {
   }
 
   /**
-   * Returns true if the fragments are for a trivial, coordinator-only query:
-   * Case 1: Only an EmptySetNode, e.g. query has a limit 0.
-   * Case 2: Query has only constant exprs.
-   */
-  private static boolean isTrivialCoordOnlyPlan(List<PlanFragment> fragments) {
-    Preconditions.checkNotNull(fragments);
-    Preconditions.checkState(!fragments.isEmpty());
-    if (fragments.size() > 1) return false;
-    PlanNode root = fragments.get(0).getPlanRoot();
-    if (root instanceof EmptySetNode) return true;
-    if (root instanceof UnionNode && ((UnionNode) root).isConstantUnion()) return true;
-    return false;
-  }
-
-  /**
-   * Estimates the per-host memory and CPU requirements for the given plan fragments,
-   * and sets the results in request.
-   * Optionally excludes the requirements for unpartitioned fragments.
+   * Estimates the per-host resource requirements for the given plans, and sets the
+   * results in request.
    * TODO: The LOG.warn() messages should eventually become Preconditions checks
    * once resource estimation is more robust.
-   * TODO: Revisit and possibly remove during MT work, particularly references to vcores.
    */
-  public void computeResourceReqs(List<PlanFragment> fragments,
-      boolean excludeUnpartitionedFragments,
+  public void computeResourceReqs(List<PlanFragment> planRoots,
       TQueryExecRequest request) {
-    Preconditions.checkState(!fragments.isEmpty());
+    Preconditions.checkState(!planRoots.isEmpty());
     Preconditions.checkNotNull(request);
 
-    // Compute pipelined plan node sets.
-    ArrayList<PipelinedPlanNodeSet> planNodeSets =
-        PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
-
-    // Compute the max of the per-host mem and vcores requirement.
-    // Note that the max mem and vcores may come from different plan node sets.
-    long maxPerHostMem = Long.MIN_VALUE;
-    int maxPerHostVcores = Integer.MIN_VALUE;
-    for (PipelinedPlanNodeSet planNodeSet: planNodeSets) {
-      if (!planNodeSet.computeResourceEstimates(
-          excludeUnpartitionedFragments, ctx_.getQueryOptions())) {
-        continue;
-      }
-      long perHostMem = planNodeSet.getPerHostMem();
-      int perHostVcores = planNodeSet.getPerHostVcores();
-      if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem;
-      if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores;
-    }
-
-    // Do not ask for more cores than are in the RuntimeEnv.
-    maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores());
-
-    // Special case for some trivial coordinator-only queries (IMPALA-3053, IMPALA-1092).
-    if (isTrivialCoordOnlyPlan(fragments)) {
-      maxPerHostMem = 1024;
-      maxPerHostVcores = 1;
-    }
-
-    // Set costs to zero if there are only unpartitioned fragments and
-    // excludeUnpartitionedFragments is true.
-    // TODO: handle this case with a better indication for unknown, e.g. -1 or not set.
-    if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) {
-      boolean allUnpartitioned = true;
-      for (PlanFragment fragment: fragments) {
-        if (fragment.isPartitioned()) {
-          allUnpartitioned = false;
-          break;
-        }
+    // Compute the sum over all plans.
+    // TODO: Revisit during MT work - scheduling of fragments will change and computing
+    // the sum may not be correct or optimal.
+    ResourceProfile totalResources = ResourceProfile.invalid();
+    for (PlanFragment planRoot: planRoots) {
+      ResourceProfile planMaxResources = ResourceProfile.invalid();
+      ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
+      // Compute pipelined plan node sets.
+      ArrayList<PipelinedPlanNodeSet> planNodeSets =
+          PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
+
+      // Compute the max of the per-host resources requirement.
+      // Note that the different maxes may come from different plan node sets.
+      for (PipelinedPlanNodeSet planNodeSet : planNodeSets) {
+        TQueryOptions queryOptions = ctx_.getQueryOptions();
+        ResourceProfile perHostResources =
+            planNodeSet.computePerHostResources(queryOptions);
+        if (!perHostResources.isValid()) continue;
+        planMaxResources = ResourceProfile.max(planMaxResources, perHostResources);
       }
-      if (allUnpartitioned && excludeUnpartitionedFragments) {
-        maxPerHostMem = 0;
-        maxPerHostVcores = 0;
-      }
-    }
-
-    if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) {
-      LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem);
-    }
-    if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) {
-      LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores);
+      totalResources = ResourceProfile.sum(totalResources, planMaxResources);
     }
-    request.setPer_host_mem_req(maxPerHostMem);
-    request.setPer_host_vcores((short) maxPerHostVcores);
 
+    Preconditions.checkState(totalResources.getMemEstimateBytes() >= 0);
+    Preconditions.checkState(totalResources.getMinReservationBytes() >= 0);
+    request.setPer_host_mem_estimate(totalResources.getMemEstimateBytes());
+    request.setPer_host_min_reservation(totalResources.getMinReservationBytes());
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Estimated per-host peak memory requirement: " + maxPerHostMem);
-      LOG.trace("Estimated per-host virtual cores requirement: " + maxPerHostVcores);
+      LOG.trace("Per-host min buffer : " + totalResources.getMinReservationBytes());
+      LOG.trace("Estimated per-host memory: " + totalResources.getMemEstimateBytes());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
new file mode 100644
index 0000000..c0dc607
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.common.PrintUtils;
+
+/**
+ * The resources that will be consumed by a set of plan nodes.
+ */
+public class ResourceProfile {
+  // If the computed values are valid.
+  private final boolean isValid_;
+
+  // Estimated memory consumption in bytes.
+  // TODO: IMPALA-5013: currently we are inconsistent about how these estimates are
+  // derived or what they mean. Re-evaluate what they mean and either deprecate or
+  // fix them.
+  private final long memEstimateBytes_;
+
+  // Minimum buffer reservation required to execute in bytes.
+  private final long minReservationBytes_;
+
+  private ResourceProfile(boolean isValid, long memEstimateBytes, long minReservationBytes) {
+    isValid_ = isValid;
+    memEstimateBytes_ = memEstimateBytes;
+    minReservationBytes_ = minReservationBytes;
+  }
+
+  public ResourceProfile(long memEstimateBytes, long minReservationBytes) {
+    this(true, memEstimateBytes, minReservationBytes);
+  }
+
+  public static ResourceProfile invalid() {
+    return new ResourceProfile(false, -1, -1);
+  }
+
+  public boolean isValid() { return isValid_; }
+  public long getMemEstimateBytes() { return memEstimateBytes_; }
+  public long getMinReservationBytes() { return minReservationBytes_; }
+
+  // Return a string with the resource profile information suitable for display in an
+  // explain plan in a format like: "resource1=value resource2=value"
+  public String getExplainString() {
+    StringBuilder output = new StringBuilder();
+    output.append("mem-estimate=");
+    output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : "invalid");
+    output.append(" mem-reservation=");
+    output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : "invalid");
+    return output.toString();
+  }
+
+  // Returns a profile with the max of each value in 'p1' and 'p2'.
+  public static ResourceProfile max(ResourceProfile p1, ResourceProfile p2) {
+    if (!p1.isValid()) return p2;
+    if (!p2.isValid()) return p1;
+    return new ResourceProfile(
+        Math.max(p1.getMemEstimateBytes(), p2.getMemEstimateBytes()),
+        Math.max(p1.getMinReservationBytes(), p2.getMinReservationBytes()));
+  }
+
+  // Returns a profile with the sum of each value in 'p1' and 'p2'.
+  public static ResourceProfile sum(ResourceProfile p1, ResourceProfile p2) {
+    if (!p1.isValid()) return p2;
+    if (!p2.isValid()) return p1;
+    return new ResourceProfile(p1.getMemEstimateBytes() + p2.getMemEstimateBytes(),
+        p1.getMinReservationBytes() + p2.getMinReservationBytes());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index e09d572..c346df9 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -27,6 +27,8 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -80,6 +82,12 @@ public class SelectNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index 5b66d18..82a1c41 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -22,6 +22,8 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -64,6 +66,12 @@ public class SingularRowSrcNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 0533b22..ef05499 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -205,10 +205,12 @@ public class SortNode extends PlanNode {
   }
 
   @Override
-  public void computeCosts(TQueryOptions queryOptions) {
+  public void computeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
     if (useTopN_) {
-      perHostMemCost_ = (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
+      long perInstanceMemEstimate =
+              (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
+      resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
       return;
     }
 
@@ -233,7 +235,15 @@ public class SortNode extends PlanNode {
     // doubles the block size when there are var-len columns present.
     if (hasVarLenSlots) blockSize *= 2;
     double numInputBlocks = Math.ceil(fullInputSize / blockSize);
-    perHostMemCost_ = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+    long perInstanceMemEstimate = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+
+    // Must be kept in sync with min_buffers_required in Sorter in be.
+    long perInstanceMinReservation = 3 * SPILLABLE_BUFFER_BYTES;
+    if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
+      perInstanceMinReservation *= 2;
+    }
+    resourceProfile_ =
+        new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
   }
 
   private static String getDisplayName(boolean isTopN, boolean isMergeOnly) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index 6143255..cbe7087 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -22,6 +22,8 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -91,6 +93,12 @@ public class SubplanNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index d724c59..6b8d331 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -30,6 +30,7 @@ import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TUnionNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -127,6 +128,12 @@ public class UnionNode extends PlanNode {
     }
   }
 
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
   /**
    * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can
    * be returned directly by the union node (without materialization into a new tuple).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index 35abc55..5847e62 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -24,6 +24,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TUnnestNode;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -73,6 +74,12 @@ public class UnnestNode extends PlanNode {
   }
 
   @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    // TODO: add an estimate
+    resourceProfile_ = new ResourceProfile(0, 0);
+  }
+
+  @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d3cefa6..1348129 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -962,6 +962,9 @@ public class Frontend {
       }
     }
 
+    // Clear pre-existing lists to avoid adding duplicate entries in FE tests.
+    queryCtx.unsetTables_missing_stats();
+    queryCtx.unsetTables_with_corrupt_stats();
     for (TTableName tableName: tablesMissingStats) {
       queryCtx.addToTables_missing_stats(tableName);
     }
@@ -972,16 +975,6 @@ public class Frontend {
       queryCtx.addToTables_missing_diskids(tableName);
     }
 
-    // Compute resource requirements after scan range locations because the cost
-    // estimates of scan nodes rely on them.
-    try {
-      planner.computeResourceReqs(fragments, true, queryExecRequest);
-    } catch (Exception e) {
-      // Turn exceptions into a warning to allow the query to execute.
-      LOG.error("Failed to compute resource requirements for query\n" +
-          queryCtx.client_request.getStmt(), e);
-    }
-
     // The fragment at this point has all state set, serialize it to thrift.
     for (PlanFragment fragment: fragments) {
       TPlanFragment thriftFragment = fragment.toThrift();
@@ -1020,6 +1013,10 @@ public class Frontend {
           createPlanExecInfo(planRoot, planner, queryCtx, result));
     }
 
+    // Compute resource requirements after scan range locations because the cost
+    // estimates of scan nodes rely on them.
+    planner.computeResourceReqs(planRoots, result);
+
     // Optionally disable spilling in the backend. Allow spilling if there are plan hints
     // or if all tables have stats.
     boolean disableSpilling =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5148f68..363c59c 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -369,4 +369,14 @@ public class PlannerTest extends PlannerTestBase {
     // Check that the effective MT_DOP is as expected.
     Assert.assertEquals(actualMtDop, expectedMtDop);
   }
+
+  @Test
+  public void testResourceRequirements() {
+    // Tests the resource requirement computation from the planner.
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+    runPlannerTestFile("resource-requirements", options, false);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index d354897..eceaeca 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -390,7 +390,8 @@ public class PlannerTestBase extends FrontendTestBase {
    * of 'testCase'.
    */
   private void runTestCase(TestCase testCase, StringBuilder errorLog,
-      StringBuilder actualOutput, String dbName, TQueryOptions options)
+      StringBuilder actualOutput, String dbName, TQueryOptions options,
+      boolean ignoreExplainHeader)
       throws CatalogException {
     if (options == null) {
       options = defaultQueryOptions();
@@ -408,16 +409,18 @@ public class PlannerTestBase extends FrontendTestBase {
         dbName, System.getProperty("user.name"));
     queryCtx.client_request.query_options = options;
     // Test single node plan, scan range locations, and column lineage.
-    TExecRequest singleNodeExecRequest =
-        testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput);
+    TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx,
+        ignoreExplainHeader, errorLog, actualOutput);
     validateTableIds(singleNodeExecRequest);
     checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput);
     checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput);
     checkLimitCardinality(query, singleNodeExecRequest, errorLog);
     // Test distributed plan.
-    testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput);
+    testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, ignoreExplainHeader, errorLog,
+        actualOutput);
     // test parallel plans
-    testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput);
+    testPlan(testCase, Section.PARALLELPLANS, queryCtx, ignoreExplainHeader, errorLog,
+        actualOutput);
   }
 
   /**
@@ -471,19 +474,26 @@ public class PlannerTestBase extends FrontendTestBase {
    *
    * Returns the produced exec request or null if there was an error generating
    * the plan.
+   *
+   * If ignoreExplainHeader is true, the explain header with warnings and resource
+   * estimates is stripped out.
    */
   private TExecRequest testPlan(TestCase testCase, Section section,
-      TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) {
+      TQueryCtx queryCtx, boolean ignoreExplainHeader,
+      StringBuilder errorLog, StringBuilder actualOutput) {
     String query = testCase.getQuery();
     queryCtx.client_request.setStmt(query);
+    TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
     if (section == Section.PLAN) {
-      queryCtx.client_request.getQuery_options().setNum_nodes(1);
+      queryOptions.setNum_nodes(1);
     } else {
       // for distributed and parallel execution we want to run on all available nodes
-      queryCtx.client_request.getQuery_options().setNum_nodes(
+      queryOptions.setNum_nodes(
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
-    if (section == Section.PARALLELPLANS) {
+    if (section == Section.PARALLELPLANS
+        && (!queryOptions.isSetMt_dop() || queryOptions.getMt_dop() == 0)) {
+      // Set mt_dop to force production of parallel plans.
       queryCtx.client_request.query_options.setMt_dop(2);
     }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
@@ -507,7 +517,8 @@ public class PlannerTestBase extends FrontendTestBase {
     // Failed to produce an exec request.
     if (execRequest == null) return null;
 
-    String explainStr = removeExplainHeader(explainBuilder.toString());
+    String explainStr = explainBuilder.toString();
+    if (ignoreExplainHeader) explainStr = removeExplainHeader(explainStr);
     actualOutput.append(explainStr);
     LOG.info(section.toString() + ":" + explainStr);
     if (expectedErrorMsg != null) {
@@ -702,10 +713,16 @@ public class PlannerTestBase extends FrontendTestBase {
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
-    runPlannerTestFile(testFile, "default", options);
+    runPlannerTestFile(testFile, options, true);
+  }
+
+  protected void runPlannerTestFile(String testFile, TQueryOptions options,
+      boolean ignoreExplainHeader) {
+    runPlannerTestFile(testFile, "default", options, ignoreExplainHeader);
   }
 
-  private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options) {
+  private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
+      boolean ignoreExplainHeader) {
     String fileName = testDir_ + "/" + testFile + ".test";
     TestFileParser queryFileParser = new TestFileParser(fileName);
     StringBuilder actualOutput = new StringBuilder();
@@ -716,7 +733,8 @@ public class PlannerTestBase extends FrontendTestBase {
       actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n"));
       actualOutput.append("\n");
       try {
-        runTestCase(testCase, errorLog, actualOutput, dbName, options);
+        runTestCase(testCase, errorLog, actualOutput, dbName, options,
+                ignoreExplainHeader);
       } catch (CatalogException e) {
         errorLog.append(String.format("Failed to plan query\n%s\n%s",
             testCase.getQuery(), e.getMessage()));
@@ -743,10 +761,10 @@ public class PlannerTestBase extends FrontendTestBase {
   }
 
   protected void runPlannerTestFile(String testFile) {
-    runPlannerTestFile(testFile, "default", null);
+    runPlannerTestFile(testFile, "default", null, true);
   }
 
   protected void runPlannerTestFile(String testFile, String dbName) {
-    runPlannerTestFile(testFile, dbName, null);
+    runPlannerTestFile(testFile, dbName, null, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 3a9855d..7effd9b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -4,42 +4,44 @@ from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
 where 5 + 5 < c_custkey and o_orderkey = (2 + 2)
   and (coalesce(2, 3, 4) * 10) + l_linenumber < (0 * 1)
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:SUBPLAN
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=2,1,0 row-size=52B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=24B mem-reservation=0B
 |  |  tuple-ids=2,1,0 row-size=52B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=0 row-size=24B cardinality=1
 |  |
 |  04:SUBPLAN
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=0B mem-reservation=0B
 |  |  tuple-ids=2,1 row-size=28B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
-|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  mem-estimate=24B mem-reservation=0B
 |  |  |  tuple-ids=2,1 row-size=28B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
-|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     mem-estimate=0B mem-reservation=0B
 |  |  |     tuple-ids=1 row-size=24B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=unavailable
+|     mem-estimate=0B mem-reservation=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -51,7 +53,7 @@ PLAN-ROOT SINK
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey > 10
    parquet dictionary predicates: c_custkey > 10
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000
 ====
 # Test HBase scan node.
@@ -59,7 +61,9 @@ select * from functional_hbase.stringids
 where string_col = cast(4 as string) and 2 + 3 = tinyint_col
   and id between concat('1', '0') and upper('20')
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN HBASE [functional_hbase.stringids]
    start key: 10
@@ -68,19 +72,21 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5, string_col = '4'
    table stats: 10000 rows total
    column stats: all
-   hosts=100 per-host-mem=unavailable
+   mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=119B cardinality=1
 ====
 # Test datasource scan node.
 select * from functional.alltypes_datasource
 where tinyint_col < (pow(2, 8)) and float_col != 0 and 1 + 1 > int_col
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
 data source predicates: tinyint_col < 256, int_col < 2
 predicates: float_col != 0
-   hosts=1 per-host-mem=unavailable
+   mem-estimate=1.00GB mem-reservation=0B
    tuple-ids=0 row-size=116B cardinality=500
 ====
 # Test aggregation.
@@ -91,20 +97,22 @@ having 1024 * 1024 * count(*) % 2 = 0
   and (sm > 1 or sm > 1)
   and (sm between 5 and 10)
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=264.00MB
 |  tuple-ids=1 row-size=17B cardinality=0
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test hash join.
@@ -114,13 +122,15 @@ left outer join functional.alltypes b
       a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a'))
 where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2))
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: 2 + a.id = b.id - 2
 |  other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col
 |  other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=15.68KB mem-reservation=136.00MB
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -129,14 +139,14 @@ PLAN-ROOT SINK
 |     table stats: 7300 rows total
 |     column stats: all
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     hosts=3 per-host-mem=unavailable
+|     mem-estimate=128.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test nested-loop join. Same as above but and with a disjunction in the On clause.
@@ -147,12 +157,14 @@ left outer join functional.alltypes b
       a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a'))
 where cast(b.double_col as decimal(3, 2)) > round(1.11 + 2.22 + 3.33 + 4.44, 1)
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:NESTED LOOP JOIN [LEFT OUTER JOIN]
 |  join predicates: (2 + a.id = b.id - 2 OR a.int_col >= 0 + b.bigint_col AND a.int_col <= b.bigint_col + 97)
 |  predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=14.26KB mem-reservation=0B
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -161,14 +173,14 @@ PLAN-ROOT SINK
 |     table stats: 7300 rows total
 |     column stats: all
 |     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|     hosts=3 per-host-mem=unavailable
+|     mem-estimate=128.00MB mem-reservation=0B
 |     tuple-ids=1 row-size=20B cardinality=730
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test distinct aggregation with grouping.
@@ -177,26 +189,28 @@ from functional.alltypes
 group by timestamp_col = cast('2015-11-15' as timestamp) + interval 1 year
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: 1048576 * count(*) % 2 = 0
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=264.00MB
 |  tuple-ids=2 row-size=17B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=264.00MB
 |  tuple-ids=1 row-size=17B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=20B cardinality=7300
 ====
 # Test non-grouping distinct aggregation.
@@ -204,25 +218,27 @@ select sum(distinct 1 + 1 + id)
 from functional.alltypes
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  having: 1048576 * zeroifnull(count(*)) % 2 = 0
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=2 row-size=16B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: 2 + id
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=264.00MB
 |  tuple-ids=1 row-size=16B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=7300
 ====
 # Test analytic eval node.
@@ -231,44 +247,48 @@ select first_value(1 + 1 + int_col - (1 - 1)) over
    order by greatest(greatest(10, 20), bigint_col))
 from functional.alltypes
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:ANALYTIC
 |  functions: first_value(2 + int_col - 0)
 |  partition by: concat('ab', string_col)
 |  order by: greatest(20, bigint_col) ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=16.00MB
 |  tuple-ids=3,2 row-size=37B cardinality=7300
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=16.00MB mem-reservation=48.00MB
 |  tuple-ids=3 row-size=29B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=29B cardinality=7300
 ====
 # Test sort node.
 select int_col from functional.alltypes
 order by id * abs((factorial(5) / power(2, 4)))
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:SORT
 |  order by: id * 7.5 ASC
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=8.00MB mem-reservation=24.00MB
 |  tuple-ids=1 row-size=8B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
    table stats: 7300 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=7300
 ====
 # Test HDFS table sink.
@@ -276,15 +296,16 @@ insert into functional.alltypes (id, int_col) partition(year,month)
 select id, int_col, cast(1 + 1 + 1 + year as int), cast(month - (1 - 1 - 1) as int)
 from functional.alltypessmall
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + year AS INT),CAST(month - -1 AS INT))]
 |  partitions=4
-|  hosts=1 per-host-mem=unavailable
+|  mem-estimate=1.56KB mem-reservation=0B
 |
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
    table stats: 100 rows total
    column stats: all
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=100
 ====
 # Constant folding does not work across query blocks.
@@ -295,11 +316,13 @@ select sum(id + c3) from
     ) v2
   ) v3
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(id + 10 + 20 + 30)
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=4 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]
@@ -307,6 +330,6 @@ PLAN-ROOT SINK
    table stats: 7300 rows total
    column stats: all
    limit: 2
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=4B cardinality=2
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index b064d2b..bad3299 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -1,100 +1,112 @@
 select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: id = '8600000US00601'
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 ---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   01:EXCHANGE [UNPARTITIONED]
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: id = '8600000US00601'
-     hosts=3 per-host-mem=0B
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 ====
 # The cardinality from "zip = '2'" should dominate.
 select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id != '1'
      kudu predicates: zip = '2'
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 ---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   01:EXCHANGE [UNPARTITIONED]
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id != '1'
      kudu predicates: zip = '2'
-     hosts=3 per-host-mem=0B
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=1
 ====
 select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: zip > '2', id > '1'
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=3317
 ---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   01:EXCHANGE [UNPARTITIONED]
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=3317
 
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      kudu predicates: zip > '2', id > '1'
-     hosts=3 per-host-mem=0B
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=3317
 ====
 select * from functional_kudu.zipcode_incomes where id = '1' or id = '2'
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id = '1' OR id = '2'
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=2
 ---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   01:EXCHANGE [UNPARTITIONED]
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=2
 
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B
   00:SCAN KUDU [functional_kudu.zipcode_incomes]
      predicates: id = '1' OR id = '2'
-     hosts=3 per-host-mem=0B
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=124B cardinality=2
 ====
 select * from functional_kudu.alltypes where
@@ -121,13 +133,14 @@ double_col in (cast('inf' as double)) and
 string_col not in ("bar") and
 id in (int_col)
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: id IN (int_col), string_col NOT IN ('bar'), bigint_col IN (9999999999999999999), double_col IN (CAST('inf' AS DOUBLE)), float_col IN (CAST('NaN' AS FLOAT)), int_col IN (9999999999), smallint_col IN (99999, 2), tinyint_col IN (1, 999), bool_col IN (1)
      kudu predicates: double_col IN (0.0), float_col IN (0.0), bigint_col IN (1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo       '), tinyint_col IN (1, 2), bool_col IN (TRUE)
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=126B cardinality=4
 ====
 select * from functional_kudu.alltypes where
@@ -135,12 +148,13 @@ tinyint_col is not null and
 smallint_col is null and
 cast(date_string_col as tinyint) is null
 ---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
   PLAN-ROOT SINK
+  |  mem-estimate=0B mem-reservation=0B
   |
   00:SCAN KUDU [functional_kudu.alltypes]
      predicates: CAST(date_string_col AS TINYINT) IS NULL
      kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL
-     hosts=3 per-host-mem=unavailable
+     mem-estimate=0B mem-reservation=0B
      tuple-ids=0 row-size=126B cardinality=730
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 4d9544d..e3dd297 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -39,17 +39,19 @@ group by bigint_col
 order by cnt, bigint_col
 limit 10
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:TOP-N [LIMIT=10]
 |  order by: count(int_col) ASC, bigint_col ASC
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=160B mem-reservation=0B
 |  tuple-ids=2 row-size=16B cardinality=10
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: bigint_col
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=128.00MB mem-reservation=264.00MB
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -59,36 +61,40 @@ PLAN-ROOT SINK
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 05:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: count(int_col) ASC, bigint_col ASC
 |  limit: 10
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=2 row-size=16B cardinality=10
 |
+F01:PLAN FRAGMENT [HASH(bigint_col)] hosts=3 instances=9
 02:TOP-N [LIMIT=10]
 |  order by: count(int_col) ASC, bigint_col ASC
-|  hosts=3 per-host-mem=160B
+|  mem-estimate=160B mem-reservation=0B
 |  tuple-ids=2 row-size=16B cardinality=10
 |
 04:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
 |  group by: bigint_col
-|  hosts=3 per-host-mem=128.00MB
+|  mem-estimate=128.00MB mem-reservation=264.00MB
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 03:EXCHANGE [HASH(bigint_col)]
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 01:AGGREGATE [STREAMING]
 |  output: count(int_col)
 |  group by: bigint_col
-|  hosts=3 per-host-mem=128.00MB
+|  mem-estimate=128.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=16B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
@@ -98,27 +104,29 @@ PLAN-ROOT SINK
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
-   hosts=3 per-host-mem=16.00MB
+   mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
-# Single-table scan/filter/analysic should work.
+# Single-table scan/filter/analytic should work.
 select row_number() over(partition by int_col order by id)
 from functional_parquet.alltypes
 where id < 10
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 02:ANALYTIC
 |  functions: row_number()
 |  partition by: int_col
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=16.00MB
 |  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=24.00MB
 |  tuple-ids=4 row-size=8B cardinality=unavailable
 |
 00:SCAN HDFS [functional_parquet.alltypes]
@@ -128,32 +136,36 @@ PLAN-ROOT SINK
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=unavailable
 ---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 04:EXCHANGE [UNPARTITIONED]
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
 02:ANALYTIC
 |  functions: row_number()
 |  partition by: int_col
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=16.00MB
 |  tuple-ids=4,3 row-size=16B cardinality=unavailable
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=24.00MB
 |  tuple-ids=4 row-size=8B cardinality=unavailable
 |
 03:EXCHANGE [HASH(int_col)]
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=0 row-size=8B cardinality=unavailable
 |
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
    partitions=24/24 files=24 size=156.57KB
    predicates: id < 10
@@ -161,7 +173,7 @@ PLAN-ROOT SINK
    column stats: unavailable
    parquet statistics predicates: id < 10
    parquet dictionary predicates: id < 10
-   hosts=3 per-host-mem=16.00MB
+   mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=8B cardinality=unavailable
 ====
 # Nested-loop join in a subplan should work.
@@ -169,42 +181,44 @@ select *
 from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
 where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:SUBPLAN
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=2,1,0 row-size=562B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=254B mem-reservation=0B
 |  |  tuple-ids=2,1,0 row-size=562B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=0 row-size=254B cardinality=1
 |  |
 |  04:SUBPLAN
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=0B mem-reservation=0B
 |  |  tuple-ids=2,1 row-size=308B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
-|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  mem-estimate=124B mem-reservation=0B
 |  |  |  tuple-ids=2,1 row-size=308B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
-|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     mem-estimate=0B mem-reservation=0B
 |  |  |     tuple-ids=1 row-size=124B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=unavailable
+|     mem-estimate=0B mem-reservation=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -216,49 +230,52 @@ PLAN-ROOT SINK
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
    parquet dictionary predicates: c_custkey < 10
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ---- PARALLELPLANS
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 09:EXCHANGE [UNPARTITIONED]
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=2,1,0 row-size=562B cardinality=1500000
 |
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 01:SUBPLAN
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=2,1,0 row-size=562B cardinality=1500000
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=254B
+|  |  mem-estimate=254B mem-reservation=0B
 |  |  tuple-ids=2,1,0 row-size=562B cardinality=100
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=0B
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=0 row-size=254B cardinality=1
 |  |
 |  04:SUBPLAN
-|  |  hosts=3 per-host-mem=0B
+|  |  mem-estimate=0B mem-reservation=0B
 |  |  tuple-ids=2,1 row-size=308B cardinality=100
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
-|  |  |  hosts=3 per-host-mem=124B
+|  |  |  mem-estimate=124B mem-reservation=0B
 |  |  |  tuple-ids=2,1 row-size=308B cardinality=10
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
-|  |  |     hosts=3 per-host-mem=0B
+|  |  |     mem-estimate=0B mem-reservation=0B
 |  |  |     tuple-ids=1 row-size=124B cardinality=1
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
-|  |     hosts=3 per-host-mem=0B
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=0B
+|     mem-estimate=0B mem-reservation=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -270,7 +287,7 @@ PLAN-ROOT SINK
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
    parquet dictionary predicates: c_custkey < 10
-   hosts=3 per-host-mem=88.00MB
+   mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ====
 # Hash-join in a subplan should work.
@@ -278,34 +295,36 @@ select c.*
 from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2
 where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:SUBPLAN
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=0B mem-reservation=136.00MB
 |  |  tuple-ids=1,0,2 row-size=286B cardinality=10
 |  |
 |  |--04:UNNEST [c.c_orders o2]
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  05:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=unavailable
+|  |  mem-estimate=270B mem-reservation=0B
 |  |  tuple-ids=1,0 row-size=278B cardinality=10
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=unavailable
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=0 row-size=270B cardinality=1
 |  |
 |  03:UNNEST [c.c_orders o1]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=unavailable
+|     mem-estimate=0B mem-reservation=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -314,41 +333,44 @@ PLAN-ROOT SINK
    predicates on o1: o1.o_orderkey < 5
    table stats: 150000 rows total
    columns missing stats: c_orders, c_orders
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ---- PARALLELPLANS
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 07:EXCHANGE [UNPARTITIONED]
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 01:SUBPLAN
-|  hosts=3 per-host-mem=0B
+|  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=1,0,2 row-size=286B cardinality=1500000
 |
 |--06:HASH JOIN [INNER JOIN]
 |  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
-|  |  hosts=3 per-host-mem=0B
+|  |  mem-estimate=0B mem-reservation=136.00MB
 |  |  tuple-ids=1,0,2 row-size=286B cardinality=10
 |  |
 |  |--04:UNNEST [c.c_orders o2]
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=0B
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=2 row-size=0B cardinality=10
 |  |
 |  05:NESTED LOOP JOIN [CROSS JOIN]
-|  |  hosts=3 per-host-mem=270B
+|  |  mem-estimate=270B mem-reservation=0B
 |  |  tuple-ids=1,0 row-size=278B cardinality=10
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
-|  |     hosts=3 per-host-mem=0B
+|  |     mem-estimate=0B mem-reservation=0B
 |  |     tuple-ids=0 row-size=270B cardinality=1
 |  |
 |  03:UNNEST [c.c_orders o1]
 |     parent-subplan=01
-|     hosts=3 per-host-mem=0B
+|     mem-estimate=0B mem-reservation=0B
 |     tuple-ids=1 row-size=0B cardinality=10
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -357,6 +379,6 @@ PLAN-ROOT SINK
    predicates on o1: o1.o_orderkey < 5
    table stats: 150000 rows total
    columns missing stats: c_orders, c_orders
-   hosts=3 per-host-mem=88.00MB
+   mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 4712b96..df5f99d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -7,21 +7,23 @@ select count(*) from functional_parquet.alltypes
 where int_col > 1 and int_col * rand() > 50 and int_col is null
 and int_col > tinyint_col;
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=165.17KB
+   partitions=24/24 files=24 size=156.57KB
    predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50
    table stats: unavailable
    column stats: unavailable
    parquet statistics predicates: int_col > 1
    parquet dictionary predicates: int_col > 1
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=5B cardinality=unavailable
 ====
 # Test a variety of types
@@ -32,20 +34,22 @@ and double_col > 100.00 and date_string_col > '1993-10-01' and string_col > 'aaa
 and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
 and year > 2000 and month < 12;
 ---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
-|  hosts=3 per-host-mem=unavailable
+|  mem-estimate=10.00MB mem-reservation=0B
 |  tuple-ids=1 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=22/24 files=22 size=151.24KB
+   partitions=22/24 files=22 size=143.36KB
    predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
    table stats: unavailable
    columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', date_string_col > '1993-10-01'
    parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
-   hosts=3 per-host-mem=unavailable
+   mem-estimate=128.00MB mem-reservation=0B
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====