You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/12 20:58:43 UTC

spark git commit: [SPARK-7276] [DATAFRAME] speed up DataFrame.select by collapsing Project

Repository: spark
Updated Branches:
  refs/heads/master 65697bbea -> 4e290522c


[SPARK-7276] [DATAFRAME] speed up DataFrame.select by collapsing Project

Author: Wenchen Fan <cl...@outlook.com>

Closes #5831 from cloud-fan/7276 and squashes the following commits:

ee4a1e1 [Wenchen Fan] fix rebase mistake
a3b565d [Wenchen Fan] refactor
99deb5d [Wenchen Fan] add test
f1f67ad [Wenchen Fan] fix 7276


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e290522
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e290522
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e290522

Branch: refs/heads/master
Commit: 4e290522c2a6310636317c54589dc35c91d95486
Parents: 65697bb
Author: Wenchen Fan <cl...@outlook.com>
Authored: Tue May 12 11:51:55 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue May 12 11:51:55 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 40 ++++++++++++--------
 .../optimizer/FilterPushdownSuite.scala         |  3 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 12 ++++++
 4 files changed, 41 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1ee5fb2..b163707 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -43,6 +43,7 @@ object DefaultOptimizer extends Optimizer {
       PushPredicateThroughJoin,
       PushPredicateThroughGenerate,
       ColumnPruning,
+      ProjectCollapsing,
       CombineLimits) ::
     Batch("ConstantFolding", FixedPoint(100),
       NullPropagation,
@@ -114,7 +115,7 @@ object UnionPushdown extends Rule[LogicalPlan] {
  *   - Aggregate
  *   - Project <- Join
  *   - LeftSemiJoin
- *  - Collapse adjacent projections, performing alias substitution.
+ *  - Performing alias substitution.
  */
 object ColumnPruning extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -152,7 +153,28 @@ object ColumnPruning extends Rule[LogicalPlan] {
 
       Join(left, prunedChild(right, allReferences), LeftSemi, condition)
 
-    // Combine adjacent Projects.
+    case Project(projectList, Limit(exp, child)) =>
+      Limit(exp, Project(projectList, child))
+
+    // Eliminate no-op Projects
+    case Project(projectList, child) if child.output == projectList => child
+  }
+
+  /** Applies a projection only when the child is producing unnecessary attributes */
+  private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
+    if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
+      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+    } else {
+      c
+    }
+}
+
+/**
+ * Combines two adjacent [[Project]] operators into one, merging the
+ * expressions into one single expression.
+ */
+object ProjectCollapsing extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case Project(projectList1, Project(projectList2, child)) =>
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -169,21 +191,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
       }).asInstanceOf[Seq[NamedExpression]]
 
       Project(substitutedProjection, child)
-
-    case Project(projectList, Limit(exp, child)) =>
-      Limit(exp, Project(projectList, child))
-      
-    // Eliminate no-op Projects
-    case Project(projectList, child) if child.output == projectList => child
   }
-
-  /** Applies a projection only when the child is producing unnecessary attributes */
-  private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
-    if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
-      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
-    } else {
-      c
-    }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 58d415d..0c428f7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -38,7 +38,8 @@ class FilterPushdownSuite extends PlanTest {
         PushPredicateThroughProject,
         PushPredicateThroughJoin,
         PushPredicateThroughGenerate,
-        ColumnPruning) :: Nil
+        ColumnPruning,
+        ProjectCollapsing) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f3107f7..1f85dac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -567,7 +567,9 @@ class DataFrame private[sql](
       case Column(expr: NamedExpression) => expr
       case Column(expr: Expression) => Alias(expr, expr.prettyString)()
     }
-    Project(namedExpressions.toSeq, logicalPlan)
+    // When user continuously call `select`, speed up analysis by collapsing `Project`
+    import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+    Project(namedExpressions.toSeq, ProjectCollapsing(logicalPlan))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d58438e..52aa1f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -493,4 +493,16 @@ class DataFrameSuite extends QueryTest {
       testData.dropDuplicates(Seq("value2")),
       Seq(Row(2, 1, 2), Row(1, 1, 1)))
   }
+
+  test("SPARK-7276: Project collapse for continuous select") {
+    var df = testData
+    for (i <- 1 to 5) {
+      df = df.select($"*")
+    }
+
+    import org.apache.spark.sql.catalyst.plans.logical.Project
+    // make sure df have at most two Projects
+    val p = df.logicalPlan.asInstanceOf[Project].child.asInstanceOf[Project]
+    assert(!p.child.isInstanceOf[Project])
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org