You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/11 01:20:37 UTC

git commit: [SQL] Improve column pruning in the optimizer.

Repository: spark
Updated Branches:
  refs/heads/master 930b70f05 -> f99401a63


[SQL] Improve column pruning in the optimizer.

Author: Michael Armbrust <mi...@databricks.com>

Closes #378 from marmbrus/columnPruning and squashes the following commits:

779da56 [Michael Armbrust] More consistent naming.
1a4e9ea [Michael Armbrust] More comments.
2f4e7b9 [Michael Armbrust] Improve column pruning in the optimizer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f99401a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f99401a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f99401a6

Branch: refs/heads/master
Commit: f99401a6308d5b9a9259d7597a35ba92f927aa50
Parents: 930b70f
Author: Michael Armbrust <mi...@databricks.com>
Authored: Thu Apr 10 16:20:33 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Apr 10 16:20:33 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 51 +++++++++++++++++++-
 .../catalyst/plans/logical/basicOperators.scala |  2 +-
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f99401a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 37b23ba..c0a09a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -33,7 +33,56 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
     Batch("Filter Pushdown", Once,
       CombineFilters,
       PushPredicateThroughProject,
-      PushPredicateThroughInnerJoin) :: Nil
+      PushPredicateThroughInnerJoin,
+      ColumnPruning) :: Nil
+}
+
+/**
+ * Attempts to eliminate the reading of unneeded columns from the query plan using the following
+ * transformations:
+ *
+ *  - Inserting Projections beneath the following operators:
+ *   - Aggregate
+ *   - Project <- Join
+ *  - Collapse adjacent projections, performing alias substitution.
+ */
+object ColumnPruning extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
+      // Project away references that are not needed to calculate the required aggregates.
+      a.copy(child = Project(a.references.toSeq, child))
+
+    case Project(projectList, Join(left, right, joinType, condition)) =>
+      // Collect the list of off references required either above or to evaluate the condition.
+      val allReferences: Set[Attribute] =
+        projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
+      /** Applies a projection when the child is producing unnecessary attributes */
+      def prunedChild(c: LogicalPlan) =
+        if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
+          Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+        } else {
+          c
+        }
+
+      Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
+
+    case Project(projectList1, Project(projectList2, child)) =>
+      // Create a map of Aliases to their values from the child projection.
+      // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
+      val aliasMap = projectList2.collect {
+        case a @ Alias(e, _) => (a.toAttribute: Expression, a)
+      }.toMap
+
+      // Substitute any attributes that are produced by the child projection, so that we safely
+      // eliminate it.
+      // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
+      // TODO: Fix TransformBase to avoid the cast below.
+      val substitutedProjection = projectList1.map(_.transform {
+        case a if aliasMap.contains(a) => aliasMap(a)
+      }).asInstanceOf[Seq[NamedExpression]]
+
+      Project(substitutedProjection, child)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f99401a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index cfc0b0c..397473e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -127,7 +127,7 @@ case class Aggregate(
   extends UnaryNode {
 
   def output = aggregateExpressions.map(_.toAttribute)
-  def references = child.references
+  def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
 }
 
 case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {