You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/04 20:08:54 UTC

spark git commit: [SPARK-13168][SQL] Collapse adjacent repartition operators

Repository: spark
Updated Branches:
  refs/heads/master 085f510ae -> 33212cb9a


[SPARK-13168][SQL] Collapse adjacent repartition operators

Spark SQL should collapse adjacent `Repartition` operators and only keep the last one.

Author: Josh Rosen <jo...@databricks.com>

Closes #11064 from JoshRosen/collapse-repartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33212cb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33212cb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33212cb9

Branch: refs/heads/master
Commit: 33212cb9a13a6012b4c19ccfc0fb3db75de304da
Parents: 085f510
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Feb 4 11:08:50 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Feb 4 11:08:50 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 16 +++-
 .../optimizer/CollapseProjectSuite.scala        | 98 ++++++++++++++++++++
 .../optimizer/FilterPushdownSuite.scala         |  2 +-
 .../sql/catalyst/optimizer/JoinOrderSuite.scala |  2 +-
 .../optimizer/ProjectCollapsingSuite.scala      | 98 --------------------
 .../spark/sql/execution/PlannerSuite.scala      | 15 ++-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  |  4 +-
 7 files changed, 129 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4ecee75..a1ac930 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -68,7 +68,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       PushPredicateThroughAggregate,
       ColumnPruning,
       // Operator combine
-      ProjectCollapsing,
+      CollapseRepartition,
+      CollapseProject,
       CombineFilters,
       CombineLimits,
       CombineUnions,
@@ -322,7 +323,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
  * Combines two adjacent [[Project]] operators into one and perform alias substitution,
  * merging the expressions into one single expression.
  */
-object ProjectCollapsing extends Rule[LogicalPlan] {
+object CollapseProject extends Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case p @ Project(projectList1, Project(projectList2, child)) =>
@@ -391,6 +392,16 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
 }
 
 /**
+ * Combines adjacent [[Repartition]] operators by keeping only the last one.
+ */
+object CollapseRepartition extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    case r @ Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
+      Repartition(numPartitions, shuffle, child)
+  }
+}
+
+/**
  * Simplifies LIKE expressions that do not need full regular expressions to evaluate the condition.
  * For example, when the expression is just checking to see if a string starts with a given
  * pattern.
@@ -857,6 +868,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
   /**
    * Splits join condition expressions into three categories based on the attributes required
    * to evaluate them.
+   *
    * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
    */
   private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
new file mode 100644
index 0000000..f5fd5ca
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Rand
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class CollapseProjectSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", FixedPoint(10), EliminateSubQueries) ::
+        Batch("CollapseProject", Once, CollapseProject) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int)
+
+  test("collapse two deterministic, independent projects into one") {
+    val query = testRelation
+      .select(('a + 1).as('a_plus_1), 'b)
+      .select('a_plus_1, ('b + 1).as('b_plus_1))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.select(('a + 1).as('a_plus_1), ('b + 1).as('b_plus_1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse two deterministic, dependent projects into one") {
+    val query = testRelation
+      .select(('a + 1).as('a_plus_1), 'b)
+      .select(('a_plus_1 + 1).as('a_plus_2), 'b)
+
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer = testRelation.select(
+      (('a + 1).as('a_plus_1) + 1).as('a_plus_2),
+      'b).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("do not collapse nondeterministic projects") {
+    val query = testRelation
+      .select(Rand(10).as('rand))
+      .select(('rand + 1).as('rand1), ('rand + 2).as('rand2))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = query.analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse two nondeterministic, independent projects into one") {
+    val query = testRelation
+      .select(Rand(10).as('rand))
+      .select(Rand(20).as('rand2))
+
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer = testRelation
+      .select(Rand(20).as('rand2)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("collapse one nondeterministic, one deterministic, independent projects into one") {
+    val query = testRelation
+      .select(Rand(10).as('rand), 'a)
+      .select(('a + 1).as('a_plus_1))
+
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer = testRelation
+      .select(('a + 1).as('a_plus_1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index f9f3bd5..b49ca92 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -42,7 +42,7 @@ class FilterPushdownSuite extends PlanTest {
         PushPredicateThroughGenerate,
         PushPredicateThroughAggregate,
         ColumnPruning,
-        ProjectCollapsing) :: Nil
+        CollapseProject) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
index 9b1e16c..858a0d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
@@ -43,7 +43,7 @@ class JoinOrderSuite extends PlanTest {
         PushPredicateThroughGenerate,
         PushPredicateThroughAggregate,
         ColumnPruning,
-        ProjectCollapsing) :: Nil
+        CollapseProject) :: Nil
 
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala
deleted file mode 100644
index 85b6530..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Rand
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
-class ProjectCollapsingSuite extends PlanTest {
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches =
-      Batch("Subqueries", FixedPoint(10), EliminateSubQueries) ::
-        Batch("ProjectCollapsing", Once, ProjectCollapsing) :: Nil
-  }
-
-  val testRelation = LocalRelation('a.int, 'b.int)
-
-  test("collapse two deterministic, independent projects into one") {
-    val query = testRelation
-      .select(('a + 1).as('a_plus_1), 'b)
-      .select('a_plus_1, ('b + 1).as('b_plus_1))
-
-    val optimized = Optimize.execute(query.analyze)
-    val correctAnswer = testRelation.select(('a + 1).as('a_plus_1), ('b + 1).as('b_plus_1)).analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("collapse two deterministic, dependent projects into one") {
-    val query = testRelation
-      .select(('a + 1).as('a_plus_1), 'b)
-      .select(('a_plus_1 + 1).as('a_plus_2), 'b)
-
-    val optimized = Optimize.execute(query.analyze)
-
-    val correctAnswer = testRelation.select(
-      (('a + 1).as('a_plus_1) + 1).as('a_plus_2),
-      'b).analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("do not collapse nondeterministic projects") {
-    val query = testRelation
-      .select(Rand(10).as('rand))
-      .select(('rand + 1).as('rand1), ('rand + 2).as('rand2))
-
-    val optimized = Optimize.execute(query.analyze)
-    val correctAnswer = query.analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("collapse two nondeterministic, independent projects into one") {
-    val query = testRelation
-      .select(Rand(10).as('rand))
-      .select(Rand(20).as('rand2))
-
-    val optimized = Optimize.execute(query.analyze)
-
-    val correctAnswer = testRelation
-      .select(Rand(20).as('rand2)).analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("collapse one nondeterministic, one deterministic, independent projects into one") {
-    val query = testRelation
-      .select(Rand(10).as('rand), 'a)
-      .select(('a + 1).as('a_plus_1))
-
-    val optimized = Optimize.execute(query.analyze)
-
-    val correctAnswer = testRelation
-      .select(('a + 1).as('a_plus_1)).analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8fca5e2..adaeb51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,8 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, Row, SQLConf}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
 import org.apache.spark.sql.functions._
@@ -223,6 +222,18 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("collapse adjacent repartitions") {
+    val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
+    def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
+    assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
+    assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
+    doubleRepartitioned.queryExecution.optimizedPlan match {
+      case r: Repartition =>
+        assert(r.numPartitions === 5)
+        assert(r.shuffle === false)
+    }
+  }
+
   // --- Unit tests of EnsureRequirements ---------------------------------------------------------
 
   // When it comes to testing whether EnsureRequirements properly ensures distribution requirements,

http://git-wip-us.apache.org/repos/asf/spark/blob/33212cb9/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 1654594..fc5725d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
-import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+import org.apache.spark.sql.catalyst.optimizer.CollapseProject
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -188,7 +188,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         // The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
         // `Aggregate`s to perform type casting.  This rule merges these `Project`s into
         // `Aggregate`s.
-        ProjectCollapsing,
+        CollapseProject,
 
         // Used to handle other auxiliary `Project`s added by analyzer (e.g.
         // `ResolveAggregateFunctions` rule)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org