You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/24 21:03:50 UTC
spark git commit: Revert "Revert "[SPARK-13383][SQL] Keep broadcast
hint after column pruning""
Repository: spark
Updated Branches:
refs/heads/master d563c8fa0 -> 65805ab6e
Revert "Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning""
This reverts commit 382b27babf7771b724f7abff78195a858631d138.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65805ab6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65805ab6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65805ab6
Branch: refs/heads/master
Commit: 65805ab6eaa6b0ed1dc7f6cbd3c3368eb750b375
Parents: d563c8f
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 24 12:03:45 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 24 12:03:45 2016 -0800
----------------------------------------------------------------------
.../catalyst/plans/logical/basicOperators.scala | 4 +
.../optimizer/JoinOptimizationSuite.scala | 122 +++++++++++++++++++
.../sql/catalyst/optimizer/JoinOrderSuite.scala | 95 ---------------
.../spark/sql/execution/SparkStrategies.scala | 12 +-
4 files changed, 133 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/65805ab6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index af43cb3..5d2a65b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -332,6 +332,10 @@ case class Join(
*/
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
+
+ // We manually set statistics of BroadcastHint to smallest value to make sure
+ // the plan wrapped by BroadcastHint will be considered to broadcast later.
+ override def statistics: Statistics = Statistics(sizeInBytes = 1)
}
case class InsertIntoTable(
http://git-wip-us.apache.org/repos/asf/spark/blob/65805ab6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
new file mode 100644
index 0000000..d482519
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class JoinOptimizationSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Filter Pushdown", FixedPoint(100),
+ CombineFilters,
+ PushPredicateThroughProject,
+ BooleanSimplification,
+ ReorderJoin,
+ PushPredicateThroughJoin,
+ PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
+ ColumnPruning,
+ CollapseProject) :: Nil
+
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation1 = LocalRelation('d.int)
+
+ test("extract filters and joins") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val z = testRelation.subquery('z)
+
+ def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
+ assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
+ }
+
+ testExtract(x, None)
+ testExtract(x.where("x.b".attr === 1), None)
+ testExtract(x.join(y), Some(Seq(x, y), Seq()))
+ testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
+ Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).where("x.b".attr === "y.d".attr),
+ Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
+ testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
+ Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
+ testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
+ Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+ }
+
+ test("reorder inner joins") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val z = testRelation.subquery('z)
+
+ val originalQuery = {
+ x.join(y).join(z)
+ .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+ }
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.join(z, condition = Some("x.b".attr === "z.b".attr))
+ .join(y, condition = Some("y.d".attr === "z.a".attr))
+ .analyze
+
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+ }
+
+ test("broadcasthint sets relation statistics to smallest value") {
+ val input = LocalRelation('key.int, 'value.string)
+
+ val query =
+ Project(Seq($"x.key", $"y.key"),
+ Join(
+ SubqueryAlias("x", input),
+ BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected =
+ Project(Seq($"x.key", $"y.key"),
+ Join(
+ Project(Seq($"x.key"), SubqueryAlias("x", input)),
+ BroadcastHint(
+ Project(Seq($"y.key"), SubqueryAlias("y", input))),
+ Inner, None)).analyze
+
+ comparePlans(optimized, expected)
+
+ val broadcastChildren = optimized.collect {
+ case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
+ }
+ assert(broadcastChildren.size == 1)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/65805ab6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
deleted file mode 100644
index a5b487b..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
-
-class JoinOrderSuite extends PlanTest {
-
- object Optimize extends RuleExecutor[LogicalPlan] {
- val batches =
- Batch("Subqueries", Once,
- EliminateSubqueryAliases) ::
- Batch("Filter Pushdown", Once,
- CombineFilters,
- PushPredicateThroughProject,
- BooleanSimplification,
- ReorderJoin,
- PushPredicateThroughJoin,
- PushPredicateThroughGenerate,
- PushPredicateThroughAggregate,
- ColumnPruning,
- CollapseProject) :: Nil
-
- }
-
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
- val testRelation1 = LocalRelation('d.int)
-
- test("extract filters and joins") {
- val x = testRelation.subquery('x)
- val y = testRelation1.subquery('y)
- val z = testRelation.subquery('z)
-
- def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
- assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
- }
-
- testExtract(x, None)
- testExtract(x.where("x.b".attr === 1), None)
- testExtract(x.join(y), Some(Seq(x, y), Seq()))
- testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
- Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).where("x.b".attr === "y.d".attr),
- Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
- testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
- Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
- testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
- Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
- }
-
- test("reorder inner joins") {
- val x = testRelation.subquery('x)
- val y = testRelation1.subquery('y)
- val z = testRelation.subquery('z)
-
- val originalQuery = {
- x.join(y).join(z)
- .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
- }
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- x.join(z, condition = Some("x.b".attr === "z.b".attr))
- .join(y, condition = Some("y.d".attr === "z.a".attr))
- .analyze
-
- comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/65805ab6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7347156..247eb05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -81,11 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
object CanBroadcast {
- def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
- case BroadcastHint(p) => Some(p)
- case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
- p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
- case _ => None
+ def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
+ if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
+ Some(plan)
+ } else {
+ None
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org