You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/14 08:27:25 UTC

git commit: [SQL] Improve column pruning.

Repository: spark
Updated Branches:
  refs/heads/master 7bb9a521f -> 6ce088444


[SQL] Improve column pruning.

Fixed a bug that was preventing us from ever pruning beneath Joins.

## TPC-DS Q3
### Before:
```
Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2]
 Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150)
  Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79]
   Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43]
    HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight
     Exchange (HashPartitioning [ss_item_sk#36:30], 150)
      HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight
       Exchange (HashPartitioning [d_date_sk#6:0], 150)
        Filter (d_moy#14:8 = 12)
         HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None
       Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150)
        HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None
     Exchange (HashPartitioning [i_item_sk#57:0], 150)
      Filter (i_manufact_id#70:13 = 436)
       HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None
```
### After
```
Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162]
 Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150)
  Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239]
   Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0]
    HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight
     Exchange (HashPartitioning [ss_item_sk#196:2], 150)
      Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3]
       HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight
        Exchange (HashPartitioning [d_date_sk#166:0], 150)
         Project [d_date_sk#166:0,d_year#172:1]
          Filter (d_moy#174:2 = 12)
           HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None
        Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150)
         HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None
     Exchange (HashPartitioning [i_item_sk#217:1], 150)
      Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2]
       Filter (i_manufact_id#230:3 = 436)
        HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None
```

Author: Michael Armbrust <mi...@databricks.com>

Closes #729 from marmbrus/fixPruning and squashes the following commits:

5feeff0 [Michael Armbrust] Improve column pruning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce08844
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce08844
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce08844

Branch: refs/heads/master
Commit: 6ce0884446d3571fd6e9d967a080a59c657543b1
Parents: 7bb9a52
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue May 13 23:27:22 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue May 13 23:27:22 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala    | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ce08844/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3037d45..406ffd6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._
 
 object Optimizer extends RuleExecutor[LogicalPlan] {
   val batches =
-    Batch("ConstantFolding", Once,
+    Batch("ConstantFolding", FixedPoint(100),
       NullPropagation,
       ConstantFolding,
       BooleanSimplification,
       SimplifyFilters,
       SimplifyCasts) ::
-    Batch("Filter Pushdown", Once,
+    Batch("Filter Pushdown", FixedPoint(100),
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughInnerJoin,
@@ -49,17 +49,19 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
  */
 object ColumnPruning extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Eliminate attributes that are not needed to calculate the specified aggregates.
     case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
-      // Project away references that are not needed to calculate the required aggregates.
       a.copy(child = Project(a.references.toSeq, child))
 
+    // Eliminate unneeded attributes from either side of a Join.
     case Project(projectList, Join(left, right, joinType, condition)) =>
       // Collect the list of off references required either above or to evaluate the condition.
       val allReferences: Set[Attribute] =
         projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
-      /** Applies a projection when the child is producing unnecessary attributes */
+
+      /** Applies a projection only when the child is producing unnecessary attributes */
       def prunedChild(c: LogicalPlan) =
-        if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
+        if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
           Project(allReferences.filter(c.outputSet.contains).toSeq, c)
         } else {
           c
@@ -67,6 +69,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
 
       Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
 
+    // Combine adjacent Projects.
     case Project(projectList1, Project(projectList2, child)) =>
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
       }).asInstanceOf[Seq[NamedExpression]]
 
       Project(substitutedProjection, child)
+
+    // Eliminate no-op Projects
+    case Project(projectList, child) if(child.output == projectList) => child
   }
 }