You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/03/09 14:13:24 UTC

[impala] 02/07: IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 29ad046d05869bed7489bc487636e0f64b3328aa
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu Sep 22 16:36:34 2022 -0400

    IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink
    
    This patch augments IMPALA-10992 by establishing a model to allow the
    weighted total amount of data to process to be used as a new factor in
    the definition and selection of an executor group. We call this model
    ProcessingCost.
    
    ProcessingCost of a PlanNode/DataSink is a weighted amount of data
    processed by that node/sink. The basic ProcessingCost is computed with a
    general formula as follows.
    
      ProcessingCost is a pair: PC(D, N), where D = I * (C + M)
    
      where D is the weighted amount of data processed
            I is the input cardinality
            C is the expression evaluation cost per row.
              Set to total weight of expression evaluation in node/sink.
            M is a materialization cost per row.
              Only used by scan and exchange node. Otherwise, 0.
            N is the number of instances.
              Default to D / 10000000.
    
    In this patch, the weight of each expression evaluation is set to a
    constant of 1. A description of the computation for each kind of
    PlanNode/DataSink is given below.
    
    01. AggregationNode:
        Each AggregateInfo has its C as a sum of grouping expression and
        aggregate expression and then assigned a single ProcessingCost
        individually. These ProcessingCosts then summed to be the Aggregation
        node's ProcessingCost;
    
    02. AnalyticEvalNode:
        C is the sum of the evaluation costs for analytic functions;
    
    03. CardinalityCheckNode:
        Use the general formula, I = 1;
    
    04. DataSourceScanNode:
        Follow the formula from the superclass ScanNode;
    
    05. EmptySetNode:
          I = 0;
    
    06. ExchangeNode:
          M = (average serialized row size) / 1024
    
        A modification of the general formula when in broadcast mode:
          D = D * number of receivers;
    
    07. HashJoinNode:
          probe cost = PC(I0 * C(equiJoin predicate),  N)  +
                       PC(output cardinality * C(otherJoin predicate), N)
          build cost = PC(I1 * C(equi-join predicate), N)
    
        With I0 and I1 as input cardinality of the probe and build side
        accordingly. If the plan node does not have a separate build, ProcessingCost
        is the sum of probe cost and build cost. Otherwise, ProcessingCost is
        equal to probeCost.
    
    08. HbaseScanNode, HdfsScanNode, and KuduScanNode:
        Follow the formula from the superclass ScanNode;
    
    09. Nested loop join node:
        When the right child is not a SingularRowSrcNode:
    
          probe cost = PC(I0 * C(equiJoin predicate), N)  +
                       PC(output cardinality * C(otherJoin predicate), N)
          build cost = PC(I1 * C(equiJoin predicate), N)
    
        When the right child is a SingularRowSrcNode:
    
          probe cost = PC(I0, N)
          build cost = PC(I0 * I1, N)
    
        With I0 and I1 as input cardinality of the probe and build side
        accordingly. If the plan node does not have a separate build, ProcessingCost
        is the sum of probe cost and build cost. Otherwise, ProcessingCost is
        equal to probeCost.
    
    10. ScanNode:
          M = (average row size) / 1024;
    
    11. SelectNode:
        Use the general formula;
    
    12. SingularRowSrcNode:
        Since the node is involved once per input in nested loop join, the
        contribution of this node is computed in nested loop join;
    
    13. SortNode:
        C is the evaluation cost for the sort expression;
    
    14. SubplanNode:
        C is 1. I is the multiplication of the cardinality of the left and
        the right child;
    
    15. Union node:
        C is the cost of result expression evaluation from all non-pass-through
        children;
    
    16. Unnest node:
        I is the cardinality of the containing SubplanNode and C is 1.
    
    17. DataStreamSink:
          M = 1 / num rows per batch.
    
    18. JoinBuildSink:
        ProcessingCost is the build cost of its associated JoinNode.
    
    19. PlanRootSink:
        If result spooling is enabled, C is the cost of output expression
        evaluation. Otherwise. ProcessingCost is zero.
    
    20. TableSink:
        C is the cost of output expression evaluation.
        TableSink subclasses (including HBaseTableSink, HdfsTableSink, and
        KuduTableSink) follows the same formula;
    
    Part 2 of IMPALA-11604 will implement an algorithm that tries to adjust
    the number of instances for each fragment by considering their
    production-consumption ratio, and then finally returns a number
    representing an ideal CPU core count required for a query to run
    efficiently.
    
    Testing:
    - Pass FE tests.
    
    Co-authored-by: Riza Suminto <ri...@cloudera.com>
    
    Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
    Reviewed-on: http://gerrit.cloudera.org:8080/19033
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Reviewed-by: Kurt Deschler <kd...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Riza Suminto <ri...@cloudera.com>
---
 .../org/apache/impala/analysis/AggregateInfo.java  |   9 +
 .../java/org/apache/impala/analysis/SortInfo.java  |   8 +
 .../org/apache/impala/planner/AggregationNode.java |  11 +
 .../apache/impala/planner/AnalyticEvalNode.java    |  12 +
 .../apache/impala/planner/BaseProcessingCost.java  |  69 +++++
 .../impala/planner/BroadcastProcessingCost.java    |  74 +++++
 .../impala/planner/CardinalityCheckNode.java       |   5 +
 .../java/org/apache/impala/planner/DataSink.java   |  33 ++-
 .../apache/impala/planner/DataSourceScanNode.java  |   5 +
 .../org/apache/impala/planner/DataStreamSink.java  |  12 +-
 .../org/apache/impala/planner/EmptySetNode.java    |  10 +
 .../org/apache/impala/planner/ExchangeNode.java    |  54 +++-
 .../org/apache/impala/planner/HBaseScanNode.java   |   6 +-
 .../org/apache/impala/planner/HBaseTableSink.java  |   6 +
 .../org/apache/impala/planner/HashJoinNode.java    |  27 ++
 .../org/apache/impala/planner/HdfsScanNode.java    |   5 +
 .../org/apache/impala/planner/HdfsTableSink.java   |   8 +-
 .../org/apache/impala/planner/JoinBuildSink.java   |   8 +
 .../java/org/apache/impala/planner/JoinNode.java   |  21 ++
 .../org/apache/impala/planner/KuduScanNode.java    |   5 +
 .../org/apache/impala/planner/KuduTableSink.java   |   7 +-
 .../apache/impala/planner/NestedLoopJoinNode.java  |  47 ++++
 .../java/org/apache/impala/planner/PlanNode.java   |  71 ++++-
 .../org/apache/impala/planner/PlanRootSink.java    |  14 +
 .../org/apache/impala/planner/ProcessingCost.java  | 306 +++++++++++++++++++++
 .../impala/planner/ScaledProcessingCost.java       |  65 +++++
 .../java/org/apache/impala/planner/ScanNode.java   |  19 ++
 .../java/org/apache/impala/planner/SelectNode.java |   5 +
 .../apache/impala/planner/SingularRowSrcNode.java  |   8 +
 .../java/org/apache/impala/planner/SortNode.java   |   6 +
 .../org/apache/impala/planner/SubplanNode.java     |   5 +
 .../apache/impala/planner/SumProcessingCost.java   |  61 ++++
 .../java/org/apache/impala/planner/TableSink.java  |   8 +-
 .../java/org/apache/impala/planner/UnionNode.java  |  21 ++
 .../java/org/apache/impala/planner/UnnestNode.java |   7 +-
 .../main/java/org/apache/impala/util/ExprUtil.java |  19 ++
 36 files changed, 1033 insertions(+), 24 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 865d1d15b..324d64829 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -25,6 +25,8 @@ import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.planner.ProcessingCost;
+import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -717,4 +719,11 @@ public class AggregateInfo extends AggregateInfoBase {
 
   @Override
   public AggregateInfo clone() { return new AggregateInfo(this); }
+
+  public ProcessingCost computeProcessingCost(String label, long inputCardinality) {
+    float weight = ExprUtil.computeExprsTotalCost(getGroupingExprs())
+        + ExprUtil.computeExprsTotalCost(getAggregateExprs());
+
+    return ProcessingCost.basicCost(label, inputCardinality, weight);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 995c686eb..e6d802ede 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -26,7 +26,9 @@ import java.util.Set;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.PlanNode;
+import org.apache.impala.planner.ProcessingCost;
 import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -313,4 +315,10 @@ public class SortInfo {
     }
     return result;
   }
+
+  public ProcessingCost computeProcessingCost(String label, long inputCardinality) {
+    float weight = ExprUtil.computeExprsTotalCost(getSortExprs());
+
+    return ProcessingCost.basicCost(label, inputCardinality, weight);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index b9741ff7b..e4eea634c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -77,6 +77,7 @@ public class AggregationNode extends PlanNode {
   private boolean useStreamingPreagg_ = false;
 
   // Resource profiles for each aggregation class.
+  // Set in computeNodeResourceProfile().
   private List<ResourceProfile> resourceProfiles_;
 
   // Conservative minimum size of hash table for low-cardinality aggregations.
@@ -505,6 +506,16 @@ public class AggregationNode extends PlanNode {
     return output;
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.zero();
+    for (AggregateInfo aggInfo : aggInfos_) {
+      ProcessingCost aggCost =
+          aggInfo.computeProcessingCost(getDisplayLabel(), getChild(0).getCardinality());
+      processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost);
+    }
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size());
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index d4fb6abf4..2ab23a643 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -357,6 +358,17 @@ public class AnalyticEvalNode extends PlanNode {
     return output.toString();
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The total cost per row is the sum of the evaluation costs for analytic functions,
+    // partition by equal and order by equal predicate. 'partitionByEq_' and 'orderByEq_'
+    // are excluded since the input data stream is already partitioned and sorted within
+    // each partition (see notes on class AnalyticEvalNode in analytic-eval-node.h).
+    float totalCostToEvalOneRow = ExprUtil.computeExprsTotalCost(analyticFnCalls_);
+    processingCost_ = ProcessingCost.basicCost(
+        getDisplayLabel(), getCardinality(), totalCostToEvalOneRow);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(
diff --git a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
new file mode 100644
index 000000000..3f6de5ffd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+/**
+ * A basic implementation of {@link ProcessingCost} that takes account expression cost
+ * and average row size as per-row costing weight.
+ */
+public class BaseProcessingCost extends ProcessingCost {
+  private final long cardinality_;
+  private final float exprsCost_;
+  private final float materializationCost_;
+
+  public BaseProcessingCost(
+      long cardinality, float exprsCost, float materializationCost) {
+    // TODO: materializationCost accommodate ProcessingCost where row width should be
+    // factor in. Currently, ProcessingCost of ScanNode, ExchangeNode, and DataStreamSink
+    // has row width factored in through materialization parameter here. Investigate if
+    // other operator need to have its row width factored in as well and whether we should
+    // have specific 'rowWidth' parameter here.
+    cardinality_ = cardinality;
+    exprsCost_ = exprsCost;
+    materializationCost_ = materializationCost;
+  }
+
+  private float costFactor() { return exprsCost_ + materializationCost_; }
+
+  @Override
+  public long getTotalCost() {
+    // Total cost must be non-negative.
+    return (long) Math.ceil(Math.max(cardinality_, 0) * costFactor());
+  }
+
+  @Override
+  public boolean isValid() {
+    return cardinality_ >= 0;
+  }
+
+  @Override
+  public ProcessingCost clone() {
+    return new BaseProcessingCost(cardinality_, exprsCost_, materializationCost_);
+  }
+
+  @Override
+  public String getDetails() {
+    StringBuilder output = new StringBuilder();
+    output.append(super.getDetails());
+    output.append(" cardinality=")
+        .append(cardinality_)
+        .append(" cost-factor=")
+        .append(costFactor());
+    return output.toString();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
new file mode 100644
index 000000000..d031ecea6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+import java.util.function.Supplier;
+
+/**
+ * Similar as {@link ScaledProcessingCost}, but the multiple (countSupplier) represent
+ * fragment instance count associated with this ProcessingCost and may change after the
+ * object construction.
+ * <p>
+ * countSupplier must always return positive value.
+ */
+public class BroadcastProcessingCost extends ProcessingCost {
+  private final ProcessingCost childProcessingCost_;
+
+  protected BroadcastProcessingCost(
+      ProcessingCost cost, Supplier<Integer> countSupplier) {
+    Preconditions.checkArgument(
+        cost.isValid(), "BroadcastProcessingCost: cost is invalid!");
+    childProcessingCost_ = cost;
+    setNumInstanceExpected(countSupplier);
+  }
+
+  @Override
+  public long getTotalCost() {
+    return MathUtil.saturatingMultiply(
+        childProcessingCost_.getTotalCost(), getNumInstancesExpected());
+  }
+
+  @Override
+  public boolean isValid() {
+    return getNumInstancesExpected() > 0;
+  }
+
+  @Override
+  public ProcessingCost clone() {
+    return new BroadcastProcessingCost(childProcessingCost_, numInstanceSupplier_);
+  }
+
+  @Override
+  public String getExplainString(String detailPrefix, boolean fullExplain) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(detailPrefix);
+    sb.append("BroadcastCost(");
+    sb.append(getNumInstancesExpected());
+    sb.append("): ");
+    sb.append(getDetails());
+    if (fullExplain) {
+      sb.append("\n");
+      sb.append(childProcessingCost_.getExplainString(detailPrefix + "  ", true));
+    }
+    return sb.toString();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
index 00cd67fdd..78d32d8ac 100644
--- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
@@ -78,6 +78,11 @@ public class CardinalityCheckNode extends PlanNode {
     msg.setCardinality_check_node(cardinalityCheckNode);
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     nodeResourceProfile_ = ResourceProfile.noReservation(0);
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index 2cacb4d3d..4b665b4c4 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -45,6 +45,10 @@ public abstract class DataSink {
   // set in computeResourceProfile()
   protected ResourceProfile resourceProfile_ = ResourceProfile.invalid();
 
+  // A total processing cost across all instances of this plan node.
+  // Set in computeProcessingCost() for a meaningful value.
+  protected ProcessingCost processingCost_ = ProcessingCost.invalid();
+
   /**
    * Return an explain string for the DataSink. Each line of the explain will be prefixed
    * by "prefix".
@@ -56,6 +60,19 @@ public abstract class DataSink {
     if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(detailPrefix);
       output.append(resourceProfile_.getExplainString());
+      if (ProcessingCost.isComputeCost(queryOptions)) {
+        // Show processing cost total.
+        output.append(" cost=");
+        if (processingCost_.isValid()) {
+          output.append(processingCost_.getTotalCost());
+          if (explainLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
+            output.append("\n");
+            output.append(processingCost_.getExplainString(detailPrefix, false));
+          }
+        } else {
+          output.append("<invalid>");
+        }
+      }
       output.append("\n");
     }
     return output.toString();
@@ -107,15 +124,29 @@ public abstract class DataSink {
   public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
   public PlanFragment getFragment() { return fragment_; }
   public ResourceProfile getResourceProfile() { return resourceProfile_; }
+  public ProcessingCost getProcessingCost() { return processingCost_; }
+
+  public abstract void computeProcessingCost(TQueryOptions queryOptions);
 
   /**
    * Compute the resource profile for an instance of this DataSink.
    */
   public abstract void computeResourceProfile(TQueryOptions queryOptions);
 
+  /**
+   * Set number of rows consumed and produced data fields in processing cost.
+   */
+  public void computeRowConsumptionAndProductionToCost() {
+    Preconditions.checkState(processingCost_.isValid(),
+        "Processing cost of DataSink " + fragment_.getId() + ":" + getLabel()
+            + " is invalid!");
+    long inputOutputCardinality = fragment_.getPlanRoot().getCardinality();
+    processingCost_.setNumRowToConsume(inputOutputCardinality);
+    processingCost_.setNumRowToProduce(inputOutputCardinality);
+  }
+
   /**
    * Collect all expressions evaluated by this data sink.
    */
   public abstract void collectExprs(List<Expr> exprs);
-
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index c89b2760f..7cafa1840 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -332,6 +332,11 @@ public class DataSourceScanNode extends ScanNode {
         new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex))));
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // This node fetches a thrift representation of the rows from the data
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 06b8f31b0..fcf1f266e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -71,9 +71,7 @@ public class DataStreamSink extends DataSink {
   private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) {
     int numChannels =
         outputPartition_.isPartitioned() ? exchNode_.getFragment().getNumInstances() : 1;
-    long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0 ?
-        queryOptions.batch_size :
-        PlanNode.DEFAULT_ROWBATCH_SIZE;
+    long rowBatchSize = PlanNode.getRowBatchSize(queryOptions);
     long avgOutboundRowBatchSize = Math.min(
         (long) Math.ceil(rowBatchSize * exchNode_.getAvgSerializedRowSize(exchNode_)),
         PlanNode.ROWBATCH_MAX_MEM_USAGE);
@@ -86,6 +84,14 @@ public class DataStreamSink extends DataSink {
     return bufferSize;
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The sending part of the processing cost for the exchange node.
+    processingCost_ =
+        ProcessingCost.basicCost(getLabel() + "(" + exchNode_.getDisplayLabel() + ")",
+            exchNode_.getCardinality(), 0, exchNode_.estimateSerializationCostPerRow());
+  }
+
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     long estimatedMem = estimateOutboundRowBatchBuffers(queryOptions);
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index b39a2ae15..02bdcff1f 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -59,6 +59,11 @@ public class EmptySetNode extends PlanNode {
     computeStats(analyzer);
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.zero();
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
@@ -78,4 +83,9 @@ public class EmptySetNode extends PlanNode {
 
   @Override
   protected boolean displayCardinality(TExplainLevel detailLevel) { return false; }
+
+  @Override
+  protected boolean isLeafNode() {
+    return true;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index c3856956f..44232b53d 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -29,6 +29,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -66,7 +67,7 @@ public class ExchangeNode extends PlanNode {
     return mergeInfo_ != null;
   }
 
-  private boolean isBroadcastExchange() {
+  protected boolean isBroadcastExchange() {
     // If the output of the sink is not partitioned but the target fragment is
     // partitioned, then the data exchange is broadcast.
     Preconditions.checkState(!children_.isEmpty());
@@ -178,6 +179,39 @@ public class ExchangeNode extends PlanNode {
         (exchInput.getTupleIds().size() * PER_TUPLE_SERIALIZATION_OVERHEAD);
   }
 
+  // Return the number of sending instances of this exchange.
+  public int getNumSenders() {
+    Preconditions.checkState(!children_.isEmpty());
+    Preconditions.checkNotNull(children_.get(0).getFragment());
+    return children_.get(0).getFragment().getNumInstances();
+  }
+
+  // Return the number of receiving instances of this exchange.
+  public int getNumReceivers() {
+    DataSink sink = fragment_.getSink();
+    if (sink == null) return 1;
+    return sink.getFragment().getNumInstances();
+  }
+
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The computation for the processing cost for exchange splits into two parts:
+    //   1. The sending processing cost which is computed in the DataStreamSink of the
+    //      bottom sending fragment;
+    //   2. The receiving processing cost in the top receiving fragment which is computed
+    //      here.
+    // Assume serialization and deserialization costs per row are equal.
+    float conjunctsCost = ExprUtil.computeExprsTotalCost(conjuncts_);
+    float materializationCost = estimateSerializationCostPerRow();
+    processingCost_ = ProcessingCost.basicCost(getDisplayLabel() + "(receiving)",
+        getChild(0).getCardinality(), conjunctsCost, materializationCost);
+
+    if (isBroadcastExchange()) {
+      processingCost_ = ProcessingCost.broadcastCost(processingCost_,
+          () -> getNumReceivers());
+    }
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // For non-merging exchanges, one row batch queue is maintained for row
@@ -193,9 +227,7 @@ public class ExchangeNode extends PlanNode {
     // the system load. This makes it difficult to accurately estimate the
     // memory usage at runtime. The following estimates assume that memory usage will
     // lean towards the soft limits.
-    Preconditions.checkState(!children_.isEmpty());
-    Preconditions.checkNotNull(children_.get(0).getFragment());
-    int numSenders = children_.get(0).getFragment().getNumInstances();
+    int numSenders = getNumSenders();
     long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders);
     long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions,
         numSenders);
@@ -205,12 +237,18 @@ public class ExchangeNode extends PlanNode {
     nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem);
   }
 
+  /**
+   * Estimate per-row serialization/deserialization cost as 1 per 1KB.
+   */
+  protected float estimateSerializationCostPerRow() {
+    return (float) getAvgSerializedRowSize(this) / 1024;
+  }
+
   // Returns the estimated size of the deferred batch queue (in bytes) by
   // assuming that at least one row batch rpc payload per sender is queued.
   private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions,
       int numSenders) {
-    long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0
-        ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE;
+    long rowBatchSize = getRowBatchSize(queryOptions);
     // Set an upper limit based on estimated cardinality.
     if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality());
     long avgRowBatchByteSize = Math.min(
@@ -251,6 +289,10 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   protected void toThrift(TPlanNode msg) {
+    if (processingCost_.isValid() && processingCost_ instanceof BroadcastProcessingCost) {
+      Preconditions.checkState(
+          getNumReceivers() == processingCost_.getNumInstancesExpected());
+    }
     msg.node_type = TPlanNodeType.EXCHANGE_NODE;
     msg.exchange_node = new TExchangeNode();
     for (TupleId tid: tupleIds_) {
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 2b764e602..96460b14b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -59,7 +59,6 @@ import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.util.BitUtil;
-import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -655,6 +654,11 @@ public class HBaseScanNode extends ScanNode {
     }
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeScanProcessingCost(queryOptions);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
index cfbb335e7..abeb27f09 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
@@ -54,6 +54,12 @@ public class HBaseTableSink extends TableSink {
     return "HBASE WRITER";
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The processing cost to export rows.
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     resourceProfile_ = ResourceProfile.noReservation(0);
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 0fe91eed3..7aeee7433 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -37,6 +37,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
@@ -314,4 +315,30 @@ public class HashJoinNode extends JoinNode {
         .setMaxRowBufferBytes(maxRowBufferSize).build();
     return Pair.create(probeProfile, buildProfile);
   }
+
+  @Override
+  public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() {
+    // TODO: The cost should consider conjuncts_ as well.
+    // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs and rhs side,
+    // and 'otherJoinConjuncts_' to the resultant rows.
+    float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_);
+    float otherJoinPredicateEvalCost =
+        ExprUtil.computeExprsTotalCost(otherJoinConjuncts_);
+
+    // Compute the processing cost for lhs.
+    ProcessingCost probeProcessingCost =
+        ProcessingCost.basicCost(getDisplayLabel() + " Probe side (eqJoinConjuncts_)",
+            getChild(0).getCardinality(), eqJoinPredicateEvalCost);
+    if (otherJoinPredicateEvalCost > 0) {
+      probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost,
+          ProcessingCost.basicCost(getDisplayLabel() + " Probe side(otherJoinConjuncts_)",
+              getCardinality(), otherJoinPredicateEvalCost));
+    }
+
+    // Compute the processing cost for rhs.
+    ProcessingCost buildProcessingCost =
+        ProcessingCost.basicCost(getDisplayLabel() + " Build side",
+            getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+    return Pair.create(probeProcessingCost, buildProcessingCost);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9931d5849..6ba0b3c31 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2108,6 +2108,11 @@ public class HdfsScanNode extends ScanNode {
     return output.toString();
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeScanProcessingCost(queryOptions);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // Update 'useMtScanNode_' before any return cases. It's used in BE.
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 76b68a1ff..399e45829 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -41,10 +41,8 @@ import org.apache.impala.thrift.TTableSinkType;
 import org.apache.impala.util.BitUtil;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -142,6 +140,12 @@ public class HdfsTableSink extends TableSink {
     externalOutputPartitionDepth_ = partitionDepth;
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The processing cost to export rows.
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     PlanNode inputNode = fragment_.getPlanRoot();
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index c8cc056d0..feed83381 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -141,11 +141,19 @@ public class JoinBuildSink extends DataSink {
                                        joinNode_.getFragment().getNumInstances();
   }
 
+  public boolean isShared() { return joinNode_.canShareBuild(); }
+
   @Override
   protected String getLabel() {
     return "JOIN BUILD";
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The processing cost to export rows.
+    processingCost_ = joinNode_.computeJoinProcessingCost().second;
+  }
+
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     resourceProfile_ = joinNode_.computeJoinResourceProfile(queryOptions).second;
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index f31045235..02d39e991 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -909,6 +909,19 @@ public abstract class JoinNode extends PlanNode {
     return result;
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    Pair<ProcessingCost, ProcessingCost> probeBuildCost = computeJoinProcessingCost();
+    if (hasSeparateBuild()) {
+      // All build resource consumption is accounted for in the separate builder.
+      processingCost_ = probeBuildCost.first;
+    } else {
+      // Both build and profile resources are accounted for in the node.
+      processingCost_ =
+          ProcessingCost.sumCost(probeBuildCost.first, probeBuildCost.second);
+    }
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Pair<ResourceProfile, ResourceProfile> profiles =
@@ -931,6 +944,14 @@ public abstract class JoinNode extends PlanNode {
   public abstract Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
       TQueryOptions queryOptions);
 
+  /**
+   * Helper method to compute the processing cost for the join that can be
+   * called from the builder or the join node. Returns a pair of the probe
+   * processing cost and the build processing cost.
+   * Does not modify the state of this node.
+   */
+  public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost();
+
   /* Helper to return all predicates as a string. */
   public String getAllPredicatesAsString(TExplainLevel level) {
     return "Conjuncts=" + Expr.getExplainString(getConjuncts(), level)
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index b65388ac6..6ec5255cb 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -395,6 +395,11 @@ public class KuduScanNode extends ScanNode {
     }
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeScanProcessingCost(queryOptions);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // The bulk of memory used by Kudu scan node is generally utilized by the
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index eba08b5ce..52fa82d65 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.impala.planner;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.impala.analysis.DescriptorTable;
@@ -94,6 +93,12 @@ public class KuduTableSink extends TableSink {
     return "KUDU WRITER";
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // The processing cost to export rows.
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     // The major chunk of memory used by this node is untracked. Part of which
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index b36cb9fc5..ecd5c03ae 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -31,6 +31,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -78,6 +79,7 @@ public class NestedLoopJoinNode extends JoinNode {
   @Override
   public Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
       TQueryOptions queryOptions) {
+    // TODO: This seems a bug below that the total data is not divided by numInstances_.
     long perInstanceMemEstimate;
     if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
         || numNodes_ == 0) {
@@ -92,6 +94,51 @@ public class NestedLoopJoinNode extends JoinNode {
     return Pair.create(ResourceProfile.noReservation(0), buildProfile);
   }
 
+  @Override
+  public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() {
+    // TODO: Make this general regardless of SingularRowSrcNode exist or not.
+    // TODO: The cost should consider conjuncts_ as well.
+    ProcessingCost probeProcessingCost = ProcessingCost.zero();
+    ProcessingCost buildProcessingCost = ProcessingCost.zero();
+    if (getChild(1) instanceof SingularRowSrcNode) {
+      // Compute the processing cost for lhs.
+      probeProcessingCost =
+          ProcessingCost.basicCost(getDisplayLabel() + "(c0, singularRowSrc) Probe side",
+              getChild(0).getCardinality(), 0);
+
+      // Compute the processing cost for rhs.
+      buildProcessingCost = ProcessingCost.basicCost(
+          getDisplayLabel() + "(c0, singularRowSrc) Build side per probe",
+          getChild(1).getCardinality(), 0);
+      // Multiply by the number of probes
+      buildProcessingCost = ProcessingCost.scaleCost(
+          buildProcessingCost, Math.max(0, getChild(0).getCardinality()));
+    } else {
+      // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs side,
+      // and 'otherJoinConjuncts_' to the resultant rows.
+      float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_);
+      float otherJoinPredicateEvalCost =
+          ExprUtil.computeExprsTotalCost(otherJoinConjuncts_);
+
+      // Compute the processing cost for lhs.
+      probeProcessingCost = ProcessingCost.basicCost(
+          getDisplayLabel() + "(c0, non-singularRowSrc, eqJoinConjuncts_) Probe side",
+          getChild(0).getCardinality(), eqJoinPredicateEvalCost);
+
+      probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost,
+          ProcessingCost.basicCost(getDisplayLabel()
+                  + "(c0, non-singularRowSrc, otherJoinConjuncts_) Probe side",
+              getCardinality(), otherJoinPredicateEvalCost));
+
+      // Compute the processing cost for rhs, assuming 'eqJoinConjuncts_' will be applied
+      // to all rows from rhs side.
+      buildProcessingCost = ProcessingCost.basicCost(
+          getDisplayLabel() + "(c0, non-singularRowSrc) Build side",
+          getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+    }
+    return Pair.create(probeProcessingCost, buildProcessingCost);
+  }
+
   @Override
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index ae19ce278..6dc9709c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
-import org.apache.impala.analysis.CollectionTableRef;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
@@ -49,6 +48,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,6 +154,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // Runtime filters assigned to this node.
   protected List<RuntimeFilter> runtimeFilters_ = new ArrayList<>();
 
+  // A total processing cost across all instances of this plan node.
+  // Gets set correctly in computeProcessingCost().
+  protected ProcessingCost processingCost_ = ProcessingCost.invalid();
+
   protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) {
     this(id, displayName);
     tupleIds_.addAll(tupleIds);
@@ -235,6 +239,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   public void setAssignedConjuncts(Set<ExprId> conjuncts) {
     assignedConjuncts_ = conjuncts;
   }
+  public ProcessingCost getProcessingCost() { return processingCost_; }
 
   /**
    * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_
@@ -342,6 +347,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       expBuilder.append(nodeResourceProfile_.getExplainString());
       expBuilder.append("\n");
 
+      if (ProcessingCost.isComputeCost(queryOptions) && processingCost_.isValid()
+          && detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
+        // Print processing cost.
+        expBuilder.append(processingCost_.getExplainString(detailPrefix, false));
+        expBuilder.append("\n");
+      }
+
       // Print tuple ids, row size and cardinality.
       expBuilder.append(detailPrefix + "tuple-ids=");
       for (int i = 0; i < tupleIds_.size(); ++i) {
@@ -358,10 +370,19 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     if (displayCardinality) {
       if (detailLevel == TExplainLevel.STANDARD) expBuilder.append(detailPrefix);
       expBuilder.append("row-size=")
-        .append(PrintUtils.printBytes(Math.round(avgRowSize_)))
-        .append(" cardinality=")
-        .append(PrintUtils.printEstCardinality(cardinality_))
-        .append("\n");
+          .append(PrintUtils.printBytes(Math.round(avgRowSize_)))
+          .append(" cardinality=")
+          .append(PrintUtils.printEstCardinality(cardinality_));
+      if (ProcessingCost.isComputeCost(queryOptions)) {
+        // Show processing cost total.
+        expBuilder.append(" cost=");
+        if (processingCost_.isValid()) {
+          expBuilder.append(processingCost_.getTotalCost());
+        } else {
+          expBuilder.append("<invalid>");
+        }
+      }
+      expBuilder.append("\n");
     }
 
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
@@ -653,6 +674,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     return cardinality;
   }
 
+  // Default implementation of computing the total data processed in bytes.
+  protected ProcessingCost computeDefaultProcessingCost() {
+    Preconditions.checkState(hasValidStats());
+    return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(),
+        ExprUtil.computeExprsTotalCost(getConjuncts()));
+  }
+
   public static long capCardinalityAtLimit(long cardinality, long limit) {
     return cardinality == -1 ? limit : Math.min(cardinality, limit);
   }
@@ -882,14 +910,41 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     }
   }
 
+  public abstract void computeProcessingCost(TQueryOptions queryOptions);
+
   /**
    * Compute peak resources consumed when executing this PlanNode, initializing
-   * 'nodeResourceProfile_'. May only be called after this PlanNode has been placed in
-   * a PlanFragment because the cost computation is dependent on the enclosing fragment's
-   * data partition.
+   * 'nodeResourceProfile_' and 'processingCost_'. May only be called after this PlanNode
+   * has been placed in a PlanFragment because the cost computation is dependent on the
+   * enclosing fragment's data partition.
    */
   public abstract void computeNodeResourceProfile(TQueryOptions queryOptions);
 
+  /**
+   * Determine whether a PlanNode is a leaf node within the plan tree.
+   * @return true if a PlanNode is a leaf node within the plan tree.
+   */
+  protected boolean isLeafNode() { return false; }
+
+  /**
+   * Set number of rows consumed and produced data fields in processing cost.
+   */
+  public void computeRowConsumptionAndProductionToCost() {
+    Preconditions.checkState(processingCost_.isValid(),
+        "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!");
+    processingCost_.setNumRowToConsume(getInputCardinality());
+    processingCost_.setNumRowToProduce(getCardinality());
+  }
+
+  /**
+   * Get row batch size after considering 'batch_size' query option.
+   */
+  protected static long getRowBatchSize(TQueryOptions queryOptions) {
+    return (queryOptions.isSetBatch_size() && queryOptions.batch_size > 0) ?
+        queryOptions.batch_size :
+        PlanNode.DEFAULT_ROWBATCH_SIZE;
+  }
+
   /**
    * Wrapper class to represent resource profiles during different phases of execution.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 53887b738..133aec186 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -26,6 +26,7 @@ import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanRootSink;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
@@ -67,6 +68,19 @@ public class PlanRootSink extends DataSink {
     return "ROOT";
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0
+        && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
+      // The processing cost to buffer these many rows in root.
+      processingCost_ =
+          ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(),
+              ExprUtil.computeExprsTotalCost(outputExprs_));
+    } else {
+      processingCost_ = ProcessingCost.zero();
+    }
+  }
+
   /**
    * Computes and sets the {@link ResourceProfile} for this PlanRootSink. If result
    * spooling is disabled, a ResourceProfile is returned with no reservation or buffer
diff --git a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
new file mode 100644
index 000000000..df695bddc
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.math.LongMath;
+
+import org.apache.impala.thrift.TQueryOptions;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A base class that encapsulate processing cost which models a total cost or amount
+ * of work shared across all instances of specific {@link PlanNode}, {@link DataSink}, or
+ * {@link PlanFragment}.
+ */
+public abstract class ProcessingCost implements Cloneable {
+  public static ProcessingCost invalid() { return new BaseProcessingCost(-1, 1, 0); }
+  public static ProcessingCost zero() { return new BaseProcessingCost(0, 1, 0); }
+
+  public static ProcessingCost maxCost(ProcessingCost a, ProcessingCost b) {
+    return (a.getTotalCost() >= b.getTotalCost()) ? a : b;
+  }
+
+  public static ProcessingCost sumCost(ProcessingCost a, ProcessingCost b) {
+    return new SumProcessingCost(a, b);
+  }
+
+  public static ProcessingCost scaleCost(ProcessingCost cost, long factor) {
+    return new ScaledProcessingCost(cost, factor);
+  }
+
+  public static ProcessingCost broadcastCost(
+      ProcessingCost cost, Supplier<Integer> numInstanceSupplier) {
+    return new BroadcastProcessingCost(cost, numInstanceSupplier);
+  }
+
+  protected static void tryAdjustConsumerParallelism(int nodeStepCount,
+      int minParallelism, int maxParallelism, ProcessingCost producer,
+      ProcessingCost consumer) {
+    Preconditions.checkState(consumer.getNumInstancesExpected() > 0);
+    Preconditions.checkState(producer.getNumInstancesExpected() > 0);
+    if (producer.getCostPerRowProduced() > 0
+        && (consumer.canReducedBy(nodeStepCount, minParallelism, producer)
+            || (consumer.canIncreaseBy(nodeStepCount, maxParallelism, producer)))) {
+      // Adjust consumer's concurrency following producer's parallelism and their
+      // produce-consume rate ratio.
+      float consProdRatio = consumer.consumerProducerRatio(producer);
+      int adjustedCount = (int) Math.ceil(consProdRatio
+                              * producer.getNumInstancesExpected() / nodeStepCount)
+          * nodeStepCount;
+      final int finalCount =
+          Math.max(minParallelism, Math.min(maxParallelism, adjustedCount));
+      consumer.setNumInstanceExpected(() -> finalCount);
+    } else if (maxParallelism < consumer.getNumInstancesExpected()) {
+      consumer.setNumInstanceExpected(() -> maxParallelism);
+    }
+  }
+
+  private static ProcessingCost computeValidBaseCost(
+      long cardinality, float exprsCost, float materializationCost) {
+    return new BaseProcessingCost(
+        Math.max(0, cardinality), exprsCost, materializationCost);
+  }
+
+  public static ProcessingCost basicCost(
+      String label, long cardinality, float exprsCost, float materializationCost) {
+    ProcessingCost processingCost =
+        computeValidBaseCost(cardinality, exprsCost, materializationCost);
+    processingCost.setLabel(label);
+    return processingCost;
+  }
+
+  public static ProcessingCost basicCost(
+      String label, long cardinality, float exprsCost) {
+    ProcessingCost processingCost = computeValidBaseCost(cardinality, exprsCost, 0);
+    processingCost.setLabel(label);
+    return processingCost;
+  }
+
+  public static boolean isComputeCost(TQueryOptions queryOptions) {
+    // TODO: Replace with proper check in IMPALA-11604 part 2.
+    return false;
+  }
+
+  /**
+   * Merge multiple ProcessingCost into a single new ProcessingCost.
+   * <p>
+   * The resulting ProcessingCost will have the total cost, number of rows produced,
+   * and number of rows consumed as a sum of respective properties of all ProcessingCost
+   * in the given list. Meanwhile, the number of instances expected is the maximum among
+   * all ProcessingCost is the list.
+   *
+   * @param costs list of all ProcessingCost to merge.
+   * @return A new combined ProcessingCost.
+   */
+  protected static ProcessingCost fullMergeCosts(List<ProcessingCost> costs) {
+    Preconditions.checkNotNull(costs);
+    Preconditions.checkArgument(!costs.isEmpty());
+
+    ProcessingCost resultingCost = ProcessingCost.zero();
+    long inputCardinality = 0;
+    long outputCardinality = 0;
+    int maxProducerParallelism = 1;
+    for (ProcessingCost cost : costs) {
+      resultingCost = ProcessingCost.sumCost(resultingCost, cost);
+      inputCardinality += cost.getNumRowToConsume();
+      outputCardinality += cost.getNumRowToProduce();
+      maxProducerParallelism =
+          Math.max(maxProducerParallelism, cost.getNumInstancesExpected());
+    }
+    resultingCost.setNumRowToConsume(inputCardinality);
+    resultingCost.setNumRowToProduce(outputCardinality);
+    final int finalProducerParallelism = maxProducerParallelism;
+    resultingCost.setNumInstanceExpected(() -> finalProducerParallelism);
+    return resultingCost;
+  }
+
+  protected Supplier<Integer> numInstanceSupplier_ = null;
+  private long numRowToProduce_ = 0;
+  private long numRowToConsume_ = 0;
+  private String label_ = null;
+  private boolean isSetNumRowToProduce_ = false;
+  private boolean isSetNumRowToConsume_ = false;
+
+  public abstract long getTotalCost();
+
+  public abstract boolean isValid();
+
+  public abstract ProcessingCost clone();
+
+  public String getDetails() {
+    StringBuilder output = new StringBuilder();
+    output.append("cost-total=")
+        .append(getTotalCost())
+        .append(" max-instances=")
+        .append(getNumInstanceMax());
+    if (hasAdjustedInstanceCount()) {
+      output.append(" adj-instances=").append(getNumInstancesExpected());
+    }
+    output.append(" cost/inst=")
+        .append(getPerInstanceCost())
+        .append(" #cons:#prod=")
+        .append(numRowToConsume_)
+        .append(":")
+        .append(numRowToProduce_);
+    if (isSetNumRowToConsume_ && isSetNumRowToProduce_) {
+      output.append(" reduction=").append(getReduction());
+    }
+    if (isSetNumRowToConsume_) {
+      output.append(" cost/cons=").append(getCostPerRowConsumed());
+    }
+    if (isSetNumRowToProduce_) {
+      output.append(" cost/prod=").append(getCostPerRowProduced());
+    }
+    return output.toString();
+  }
+
+  public String debugString() {
+    StringBuilder output = new StringBuilder();
+    if (label_ != null) {
+      output.append(label_);
+      output.append("=");
+    }
+    output.append(this);
+    return output.toString();
+  }
+
+  @Override
+  public String toString() {
+    return "{" + getDetails() + "}";
+  }
+
+  public String getExplainString(String detailPrefix, boolean fullExplain) {
+    return detailPrefix + getDetails();
+  }
+
+  public void setNumInstanceExpected(Supplier<Integer> countSupplier) {
+    Preconditions.checkArgument(
+        countSupplier.get() > 0, "Number of instance must be greater than 0!");
+    numInstanceSupplier_ = countSupplier;
+  }
+
+  public int getNumInstancesExpected() {
+    return hasAdjustedInstanceCount() ? numInstanceSupplier_.get() : getNumInstanceMax();
+  }
+
+  private boolean hasAdjustedInstanceCount() {
+    return numInstanceSupplier_ != null && numInstanceSupplier_.get() > 0;
+  }
+
+  private int getNumInstanceMax() {
+    // TODO: replace minProcessingCostPerThread with backend flag.
+    long minProcessingCostPerThread = 10000000L;
+    long maxInstance = LongMath.divide(getTotalCost(),
+        minProcessingCostPerThread, RoundingMode.CEILING);
+    if (maxInstance > 0) {
+      return maxInstance < Integer.MAX_VALUE ? (int) maxInstance : Integer.MAX_VALUE;
+    } else {
+      return 1;
+    }
+  }
+
+  /**
+   * Set num rows to produce.
+   *
+   * @param numRowToProduce Number of rows to produce by plan node or data sink associated
+   *     with this cost. Assume 0 rows if negative value is given.
+   */
+  public void setNumRowToProduce(long numRowToProduce) {
+    numRowToProduce_ = Math.max(0, numRowToProduce);
+    isSetNumRowToProduce_ = true;
+  }
+
+  /**
+   * Set num rows to consume.
+   *
+   * @param numRowToConsume Number of rows to consume by plan node or data sink associated
+   *     with this cost. Assume 0 rows if negative value is given.
+   */
+  protected void setNumRowToConsume(long numRowToConsume) {
+    numRowToConsume_ = Math.max(0, numRowToConsume);
+    isSetNumRowToConsume_ = true;
+  }
+
+  public void setLabel(String label) { label_ = label; }
+  public long getNumRowToConsume() { return numRowToConsume_; }
+  public long getNumRowToProduce() { return numRowToProduce_; }
+
+  private int getPerInstanceCost() {
+    Preconditions.checkState(getNumInstancesExpected() > 0);
+    return (int) Math.ceil((float) getTotalCost() / getNumInstancesExpected());
+  }
+
+  private float getReduction() {
+    return (float) numRowToConsume_ / Math.max(1, numRowToProduce_);
+  }
+
+  private float getCostPerRowProduced() {
+    return (float) getTotalCost() / Math.max(1, numRowToProduce_);
+  }
+
+  private float getCostPerRowConsumed() {
+    return (float) getTotalCost() / Math.max(1, numRowToConsume_);
+  }
+
+  private float instanceRatio(ProcessingCost other) {
+    Preconditions.checkState(getNumInstancesExpected() > 0);
+    return (float) getNumInstancesExpected() / other.getNumInstancesExpected();
+  }
+
+  private float consumerProducerRatio(ProcessingCost other) {
+    return getCostPerRowConsumed() / Math.max(1, other.getCostPerRowProduced());
+  }
+
+  private boolean isAtLowestInstanceRatio(
+      int nodeStepCount, int minParallelism, ProcessingCost other) {
+    if (getNumInstancesExpected() - nodeStepCount < minParallelism) {
+      return true;
+    } else {
+      float lowerRatio = (float) (getNumInstancesExpected() - nodeStepCount)
+          / other.getNumInstancesExpected();
+      return lowerRatio < consumerProducerRatio(other);
+    }
+  }
+
+  private boolean isAtHighestInstanceRatio(
+      int nodeStepCount, int maxInstance, ProcessingCost other) {
+    if (getNumInstancesExpected() + nodeStepCount > maxInstance) {
+      return true;
+    } else {
+      float higherRatio = (float) (getNumInstancesExpected() + nodeStepCount)
+          / other.getNumInstancesExpected();
+      return higherRatio > consumerProducerRatio(other);
+    }
+  }
+
+  private boolean canReducedBy(
+      int nodeStepCount, int minParallelism, ProcessingCost other) {
+    return !isAtLowestInstanceRatio(nodeStepCount, minParallelism, other)
+        && consumerProducerRatio(other) < instanceRatio(other);
+  }
+
+  private boolean canIncreaseBy(
+      int nodeStepCount, int maxInstance, ProcessingCost other) {
+    return !isAtHighestInstanceRatio(nodeStepCount, maxInstance, other)
+        && consumerProducerRatio(other) > instanceRatio(other);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
new file mode 100644
index 000000000..421ee9e64
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+public class ScaledProcessingCost extends ProcessingCost {
+  private final ProcessingCost cost_;
+  private final long multiplier_;
+
+  protected ScaledProcessingCost(ProcessingCost cost, long multiplier) {
+    Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is invalid!");
+    Preconditions.checkArgument(
+        multiplier >= 0, "ScaledProcessingCost: multiplier must be non-negative!");
+    cost_ = cost;
+    multiplier_ = multiplier;
+  }
+
+  @Override
+  public long getTotalCost() {
+    return MathUtil.saturatingMultiply(cost_.getTotalCost(), multiplier_);
+  }
+
+  @Override
+  public boolean isValid() {
+    return true;
+  }
+
+  @Override
+  public ProcessingCost clone() {
+    return new ScaledProcessingCost(cost_, multiplier_);
+  }
+
+  @Override
+  public String getExplainString(String detailPrefix, boolean fullExplain) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(detailPrefix);
+    sb.append("ScaledCost(");
+    sb.append(multiplier_);
+    sb.append("): ");
+    sb.append(getDetails());
+    if (fullExplain) {
+      sb.append("\n");
+      sb.append(cost_.getExplainString(detailPrefix + "  ", true));
+    }
+    return sb.toString();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index d250eb8db..be12b293c 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -40,6 +40,7 @@ import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.MoreObjects;
@@ -343,6 +344,24 @@ abstract public class ScanNode extends PlanNode {
     }
     return maxScannerThreads;
   }
+
+  protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) {
+    return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(),
+        ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost());
+  }
+
+  /**
+   * Estimate per-row cost as 1 per 1KB row size.
+   * <p>
+   * This reflect deserialization/copy cost per row.
+   */
+  private float rowMaterializationCost() { return getAvgRowSize() / 1024; }
+
+  @Override
+  protected boolean isLeafNode() {
+    return true;
+  }
+
   /**
    * Returns true if this node has conjuncts to be evaluated by Impala against the scan
    * tuple.
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index fa888f47f..1bde0d41a 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -117,6 +117,11 @@ public class SelectNode extends PlanNode {
 
   public void setSelectivity(double value) { selectivity_ = value; }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = computeDefaultProcessingCost();
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // The select node initializes a single row-batch which it recycles on every
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index e9c498417..f37b67c11 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -23,6 +23,7 @@ import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -66,6 +67,13 @@ public class SingularRowSrcNode extends PlanNode {
     numInstances_ = containingSubplanNode_.getChild(0).getNumInstances();
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.basicCost(getDisplayLabel(),
+        containingSubplanNode_.getChild(0).getCardinality(),
+        ExprUtil.computeExprsTotalCost(getConjuncts()));
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 5f12dee45..6d8ad8b03 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -453,6 +453,12 @@ public class SortNode extends PlanNode {
     return offset_ != 0 ? prefix + "offset: " + Long.toString(offset_) + "\n" : "";
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ =
+        getSortInfo().computeProcessingCost(getDisplayLabel(), getCardinality());
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index eb57df628..09a6ac7d8 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -96,6 +96,11 @@ public class SubplanNode extends PlanNode {
     cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), getCardinality(), 0);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
new file mode 100644
index 000000000..fcbd6dfab
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+public class SumProcessingCost extends ProcessingCost {
+  private final ProcessingCost cost1_;
+  private final ProcessingCost cost2_;
+
+  protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) {
+    Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is invalid!");
+    Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is invalid!");
+    cost1_ = cost1;
+    cost2_ = cost2;
+  }
+
+  @Override
+  public long getTotalCost() {
+    return MathUtil.saturatingAdd(cost1_.getTotalCost(), cost2_.getTotalCost());
+  }
+
+  @Override
+  public boolean isValid() {
+    return true;
+  }
+
+  @Override
+  public ProcessingCost clone() {
+    return new SumProcessingCost(cost1_, cost2_);
+  }
+
+  @Override
+  public String getExplainString(String detailPrefix, boolean fullExplain) {
+    StringBuilder output = new StringBuilder();
+    output.append(detailPrefix).append("SumCost: ").append(getDetails());
+    if (fullExplain) {
+      String nextPrefix = detailPrefix + "  ";
+      output.append("\n").append(cost1_.getExplainString(nextPrefix, true));
+      output.append("\n").append(cost2_.getExplainString(nextPrefix, true));
+    }
+    return output.toString();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index cc12bd995..0f34ddb20 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.planner;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.impala.analysis.Expr;
@@ -28,6 +27,7 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TSinkAction;
 import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.ExprUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -160,4 +160,10 @@ public abstract class TableSink extends DataSink {
           "Cannot create data sink into table of type: " + table.getClass().getName());
     }
   }
+
+  protected ProcessingCost computeDefaultProcessingCost() {
+    // TODO: consider including materialization cost into the returned cost.
+    return ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(),
+        ExprUtil.computeExprsTotalCost(outputExprs_));
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index b1dbe7212..c8f45dd7b 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -33,6 +33,7 @@ import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TUnionNode;
+import org.apache.impala.util.ExprUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,6 +149,19 @@ public class UnionNode extends PlanNode {
     }
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    // Compute the cost for materializing child rows and use that to figure out
+    // the total data processed. Assume the costs of processing pass-through rows are 0.
+    float totalMaterializeCost = 0;
+    for (int i = firstMaterializedChildIdx_; i < resultExprLists_.size(); i++) {
+      totalMaterializeCost += ExprUtil.computeExprsTotalCost(resultExprLists_.get(i));
+    }
+
+    processingCost_ =
+        ProcessingCost.basicCost(getDisplayLabel(), cardinality_, totalMaterializeCost);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // The union node directly returns the rows for children marked as pass
@@ -375,4 +389,11 @@ public class UnionNode extends PlanNode {
       }
     }
   }
+
+  @Override
+  protected boolean isLeafNode() {
+    // Union node is being scheduled the same as scan node.
+    // See Scheduler::CreateCollocatedAndScanInstances() in scheduler.cc.
+    return true;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index dd6eb33f5..919d849e3 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.planner;
 
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.impala.analysis.Analyzer;
@@ -96,6 +95,12 @@ public class UnnestNode extends PlanNode {
     cardinality_ = capCardinalityAtLimit(cardinality_);
   }
 
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.basicCost(
+        getDisplayLabel(), containingSubplanNode_.getChild(0).getCardinality(), 0);
+  }
+
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
index 934fabd0b..2b34835da 100644
--- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
@@ -27,9 +27,12 @@ import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumnValue;
 
+import java.util.List;
+
 public class ExprUtil {
   /**
    * Converts a UTC timestamp to UNIX microseconds.
@@ -102,4 +105,20 @@ public class ExprUtil {
     toUtcTimestamp.analyze(analyzer);
     return toUtcTimestamp;
   }
+
+  // Compute total cost for a list of expressions. Return 0 for a null list.
+  public static float computeExprsTotalCost(List<? extends Expr> exprs) {
+    // TODO: Implement the cost for conjunts once the implemetation for
+    // 'Expr' is in place.
+    if (exprs == null) return 0;
+    return exprs.size();
+  }
+
+  public static float computeExprCost(Expr e) {
+    if (e == null) return 0;
+    return 1;
+    // TODO Implement a function that can take into consideration of data types,
+    // expressions and potentially LLVM translation in BE. The function must also
+    // run fast.
+  }
 }