You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/24 21:02:18 UTC
[1/2] spark git commit: Revert "[SPARK-13383][SQL] Keep broadcast
hint after column pruning"
Repository: spark
Updated Branches:
refs/heads/master f37398699 -> d563c8fa0
Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning"
This reverts commit f3739869973ba4285196a61775d891292b8e282b.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/382b27ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/382b27ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/382b27ba
Branch: refs/heads/master
Commit: 382b27babf7771b724f7abff78195a858631d138
Parents: f373986
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 24 11:58:12 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 24 11:58:12 2016 -0800
----------------------------------------------------------------------
.../catalyst/plans/logical/basicOperators.scala | 4 -
.../optimizer/JoinOptimizationSuite.scala | 122 -------------------
.../sql/catalyst/optimizer/JoinOrderSuite.scala | 95 +++++++++++++++
.../spark/sql/execution/SparkStrategies.scala | 12 +-
4 files changed, 100 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d2a65b..af43cb3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -332,10 +332,6 @@ case class Join(
*/
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
-
- // We manually set statistics of BroadcastHint to smallest value to make sure
- // the plan wrapped by BroadcastHint will be considered to broadcast later.
- override def statistics: Statistics = Statistics(sizeInBytes = 1)
}
case class InsertIntoTable(
http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
deleted file mode 100644
index d482519..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
-
-class JoinOptimizationSuite extends PlanTest {
-
- object Optimize extends RuleExecutor[LogicalPlan] {
- val batches =
- Batch("Subqueries", Once,
- EliminateSubqueryAliases) ::
- Batch("Filter Pushdown", FixedPoint(100),
- CombineFilters,
- PushPredicateThroughProject,
- BooleanSimplification,
- ReorderJoin,
- PushPredicateThroughJoin,
- PushPredicateThroughGenerate,
- PushPredicateThroughAggregate,
- ColumnPruning,
- CollapseProject) :: Nil
-
- }
-
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
- val testRelation1 = LocalRelation('d.int)
-
- test("extract filters and joins") {
- val x = testRelation.subquery('x)
- val y = testRelation1.subquery('y)
- val z = testRelation.subquery('z)
-
- def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
- assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
- }
-
- testExtract(x, None)
- testExtract(x.where("x.b".attr === 1), None)
- testExtract(x.join(y), Some(Seq(x, y), Seq()))
- testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
- Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).where("x.b".attr === "y.d".attr),
- Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
- testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
- Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
- testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
- testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
- Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
- }
-
- test("reorder inner joins") {
- val x = testRelation.subquery('x)
- val y = testRelation1.subquery('y)
- val z = testRelation.subquery('z)
-
- val originalQuery = {
- x.join(y).join(z)
- .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
- }
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- x.join(z, condition = Some("x.b".attr === "z.b".attr))
- .join(y, condition = Some("y.d".attr === "z.a".attr))
- .analyze
-
- comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
- }
-
- test("broadcasthint sets relation statistics to smallest value") {
- val input = LocalRelation('key.int, 'value.string)
-
- val query =
- Project(Seq($"x.key", $"y.key"),
- Join(
- SubqueryAlias("x", input),
- BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
-
- val optimized = Optimize.execute(query)
-
- val expected =
- Project(Seq($"x.key", $"y.key"),
- Join(
- Project(Seq($"x.key"), SubqueryAlias("x", input)),
- BroadcastHint(
- Project(Seq($"y.key"), SubqueryAlias("y", input))),
- Inner, None)).analyze
-
- comparePlans(optimized, expected)
-
- val broadcastChildren = optimized.collect {
- case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
- }
- assert(broadcastChildren.size == 1)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
new file mode 100644
index 0000000..a5b487b
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class JoinOrderSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueryAliases) ::
+ Batch("Filter Pushdown", Once,
+ CombineFilters,
+ PushPredicateThroughProject,
+ BooleanSimplification,
+ ReorderJoin,
+ PushPredicateThroughJoin,
+ PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
+ ColumnPruning,
+ CollapseProject) :: Nil
+
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation1 = LocalRelation('d.int)
+
+ test("extract filters and joins") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val z = testRelation.subquery('z)
+
+ def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
+ assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
+ }
+
+ testExtract(x, None)
+ testExtract(x.where("x.b".attr === 1), None)
+ testExtract(x.join(y), Some(Seq(x, y), Seq()))
+ testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
+ Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).where("x.b".attr === "y.d".attr),
+ Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
+ testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
+ Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
+ testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
+ testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
+ Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+ }
+
+ test("reorder inner joins") {
+ val x = testRelation.subquery('x)
+ val y = testRelation1.subquery('y)
+ val z = testRelation.subquery('z)
+
+ val originalQuery = {
+ x.join(y).join(z)
+ .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+ }
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.join(z, condition = Some("x.b".attr === "z.b".attr))
+ .join(y, condition = Some("y.d".attr === "z.a".attr))
+ .analyze
+
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 247eb05..7347156 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -81,13 +81,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
object CanBroadcast {
- def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
- if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
- plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
- Some(plan)
- } else {
- None
- }
+ def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+ case BroadcastHint(p) => Some(p)
+ case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
+ case _ => None
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: Revert "[SPARK-13376] [SQL] improve column
pruning"
Posted by rx...@apache.org.
Revert "[SPARK-13376] [SQL] improve column pruning"
This reverts commit e9533b419e3a87589313350310890ce0caf73dbb.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d563c8fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d563c8fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d563c8fa
Branch: refs/heads/master
Commit: d563c8fa01cfaebb5899ff7970115d0f2e64e8d5
Parents: 382b27b
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 24 11:58:32 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 24 11:58:32 2016 -0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 128 ++++++++++---------
.../catalyst/optimizer/ColumnPruningSuite.scala | 128 +------------------
.../optimizer/FilterPushdownSuite.scala | 80 ++++++++++++
.../columnar/InMemoryColumnarTableScan.scala | 7 +-
4 files changed, 156 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d563c8fa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2b80497..1f05f20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -313,85 +313,97 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- // Prunes the unused columns from project list of Project/Aggregate/Window/Expand
- case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
- p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
- case p @ Project(_, a: Aggregate) if (a.outputSet -- p.references).nonEmpty =>
- p.copy(
- child = a.copy(aggregateExpressions = a.aggregateExpressions.filter(p.references.contains)))
- case p @ Project(_, w: Window) if (w.outputSet -- p.references).nonEmpty =>
- p.copy(child = w.copy(
- projectList = w.projectList.filter(p.references.contains),
- windowExpressions = w.windowExpressions.filter(p.references.contains)))
- case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
- val newOutput = e.output.filter(a.references.contains(_))
- val newProjects = e.projections.map { proj =>
- proj.zip(e.output).filter { case (e, a) =>
+ case a @ Aggregate(_, _, e @ Expand(projects, output, child))
+ if (e.outputSet -- a.references).nonEmpty =>
+ val newOutput = output.filter(a.references.contains(_))
+ val newProjects = projects.map { proj =>
+ proj.zip(output).filter { case (e, a) =>
newOutput.contains(a)
}.unzip._1
}
- a.copy(child = Expand(newProjects, newOutput, grandChild))
- // TODO: support some logical plan for Dataset
+ a.copy(child = Expand(newProjects, newOutput, child))
- // Prunes the unused columns from child of Aggregate/Window/Expand/Generate
+ case a @ Aggregate(_, _, e @ Expand(_, _, child))
+ if (child.outputSet -- e.references -- a.references).nonEmpty =>
+ a.copy(child = e.copy(child = prunedChild(child, e.references ++ a.references)))
+
+ // Eliminate attributes that are not needed to calculate the specified aggregates.
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
- a.copy(child = prunedChild(child, a.references))
- case w @ Window(_, _, _, _, child) if (child.outputSet -- w.references).nonEmpty =>
- w.copy(child = prunedChild(child, w.references))
- case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty =>
- e.copy(child = prunedChild(child, e.references))
+ a.copy(child = Project(a.references.toSeq, child))
+
+ // Eliminate attributes that are not needed to calculate the Generate.
case g: Generate if !g.join && (g.child.outputSet -- g.references).nonEmpty =>
- g.copy(child = prunedChild(g.child, g.references))
+ g.copy(child = Project(g.references.toSeq, g.child))
- // Turn off `join` for Generate if no column from it's child is used
case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) =>
p.copy(child = g.copy(join = false))
- // Eliminate unneeded attributes from right side of a LeftSemiJoin.
- case j @ Join(left, right, LeftSemi, condition) =>
- j.copy(right = prunedChild(right, j.references))
-
- // all the columns will be used to compare, so we can't prune them
- case p @ Project(_, _: SetOperation) => p
- case p @ Project(_, _: Distinct) => p
- // Eliminate unneeded attributes from children of Union.
- case p @ Project(_, u: Union) =>
- if ((u.outputSet -- p.references).nonEmpty) {
- val firstChild = u.children.head
- val newOutput = prunedChild(firstChild, p.references).output
- // pruning the columns of all children based on the pruned first child.
- val newChildren = u.children.map { p =>
- val selected = p.output.zipWithIndex.filter { case (a, i) =>
- newOutput.contains(firstChild.output(i))
- }.map(_._1)
- Project(selected, p)
- }
- p.copy(child = u.withNewChildren(newChildren))
- } else {
+ case p @ Project(projectList, g: Generate) if g.join =>
+ val neededChildOutput = p.references -- g.generatorOutput ++ g.references
+ if (neededChildOutput == g.child.outputSet) {
p
+ } else {
+ Project(projectList, g.copy(child = Project(neededChildOutput.toSeq, g.child)))
}
- // Can't prune the columns on LeafNode
- case p @ Project(_, l: LeafNode) => p
+ case p @ Project(projectList, a @ Aggregate(groupingExpressions, aggregateExpressions, child))
+ if (a.outputSet -- p.references).nonEmpty =>
+ Project(
+ projectList,
+ Aggregate(
+ groupingExpressions,
+ aggregateExpressions.filter(e => p.references.contains(e)),
+ child))
- // Eliminate no-op Projects
- case p @ Project(projectList, child) if child.output == p.output => child
-
- // for all other logical plans that inherits the output from it's children
- case p @ Project(_, child) =>
- val required = child.references ++ p.references
- if ((child.inputSet -- required).nonEmpty) {
- val newChildren = child.children.map(c => prunedChild(c, required))
- p.copy(child = child.withNewChildren(newChildren))
+ // Eliminate unneeded attributes from either side of a Join.
+ case Project(projectList, Join(left, right, joinType, condition)) =>
+ // Collect the list of all references required either above or to evaluate the condition.
+ val allReferences: AttributeSet =
+ AttributeSet(
+ projectList.flatMap(_.references.iterator)) ++
+ condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
+
+ /** Applies a projection only when the child is producing unnecessary attributes */
+ def pruneJoinChild(c: LogicalPlan): LogicalPlan = prunedChild(c, allReferences)
+
+ Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition))
+
+ // Eliminate unneeded attributes from right side of a LeftSemiJoin.
+ case Join(left, right, LeftSemi, condition) =>
+ // Collect the list of all references required to evaluate the condition.
+ val allReferences: AttributeSet =
+ condition.map(_.references).getOrElse(AttributeSet(Seq.empty))
+
+ Join(left, prunedChild(right, allReferences), LeftSemi, condition)
+
+ // Push down project through limit, so that we may have chance to push it further.
+ case Project(projectList, Limit(exp, child)) =>
+ Limit(exp, Project(projectList, child))
+
+ // Push down project if possible when the child is sort.
+ case p @ Project(projectList, s @ Sort(_, _, grandChild)) =>
+ if (s.references.subsetOf(p.outputSet)) {
+ s.copy(child = Project(projectList, grandChild))
} else {
- p
+ val neededReferences = s.references ++ p.references
+ if (neededReferences == grandChild.outputSet) {
+ // No column we can prune, return the original plan.
+ p
+ } else {
+ // Do not use neededReferences.toSeq directly, should respect grandChild's output order.
+ val newProjectList = grandChild.output.filter(neededReferences.contains)
+ p.copy(child = s.copy(child = Project(newProjectList, grandChild)))
+ }
}
+
+ // Eliminate no-op Projects
+ case Project(projectList, child) if child.output == projectList => child
}
/** Applies a projection only when the child is producing unnecessary attributes */
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
- Project(c.output.filter(allReferences.contains), c)
+ Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d563c8fa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 715d01a..c890fff 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Explode, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -120,134 +119,11 @@ class ColumnPruningSuite extends PlanTest {
Seq('c, Literal.create(null, StringType), 1),
Seq('c, 'a, 2)),
Seq('c, 'aa.int, 'gid.int),
- Project(Seq('a, 'c),
+ Project(Seq('c, 'a),
input))).analyze
comparePlans(optimized, expected)
}
- test("Column pruning on Filter") {
- val input = LocalRelation('a.int, 'b.string, 'c.double)
- val query = Project('a :: Nil, Filter('c > Literal(0.0), input)).analyze
- val expected =
- Project('a :: Nil,
- Filter('c > Literal(0.0),
- Project(Seq('a, 'c), input))).analyze
- comparePlans(Optimize.execute(query), expected)
- }
-
- test("Column pruning on except/intersect/distinct") {
- val input = LocalRelation('a.int, 'b.string, 'c.double)
- val query = Project('a :: Nil, Except(input, input)).analyze
- comparePlans(Optimize.execute(query), query)
-
- val query2 = Project('a :: Nil, Intersect(input, input)).analyze
- comparePlans(Optimize.execute(query2), query2)
- val query3 = Project('a :: Nil, Distinct(input)).analyze
- comparePlans(Optimize.execute(query3), query3)
- }
-
- test("Column pruning on Project") {
- val input = LocalRelation('a.int, 'b.string, 'c.double)
- val query = Project('a :: Nil, Project(Seq('a, 'b), input)).analyze
- val expected = Project(Seq('a), input).analyze
- comparePlans(Optimize.execute(query), expected)
- }
-
- test("column pruning for group") {
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
- val originalQuery =
- testRelation
- .groupBy('a)('a, count('b))
- .select('a)
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- testRelation
- .select('a)
- .groupBy('a)('a).analyze
-
- comparePlans(optimized, correctAnswer)
- }
-
- test("column pruning for group with alias") {
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
-
- val originalQuery =
- testRelation
- .groupBy('a)('a as 'c, count('b))
- .select('c)
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- testRelation
- .select('a)
- .groupBy('a)('a as 'c).analyze
-
- comparePlans(optimized, correctAnswer)
- }
-
- test("column pruning for Project(ne, Limit)") {
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
-
- val originalQuery =
- testRelation
- .select('a, 'b)
- .limit(2)
- .select('a)
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- testRelation
- .select('a)
- .limit(2).analyze
-
- comparePlans(optimized, correctAnswer)
- }
-
- test("push down project past sort") {
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
- val x = testRelation.subquery('x)
-
- // push down valid
- val originalQuery = {
- x.select('a, 'b)
- .sortBy(SortOrder('a, Ascending))
- .select('a)
- }
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- x.select('a)
- .sortBy(SortOrder('a, Ascending)).analyze
-
- comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
-
- // push down invalid
- val originalQuery1 = {
- x.select('a, 'b)
- .sortBy(SortOrder('a, Ascending))
- .select('b)
- }
-
- val optimized1 = Optimize.execute(originalQuery1.analyze)
- val correctAnswer1 =
- x.select('a, 'b)
- .sortBy(SortOrder('a, Ascending))
- .select('b).analyze
-
- comparePlans(optimized1, analysis.EliminateSubqueryAliases(correctAnswer1))
- }
-
- test("Column pruning on Union") {
- val input1 = LocalRelation('a.int, 'b.string, 'c.double)
- val input2 = LocalRelation('c.int, 'd.string, 'e.double)
- val query = Project('b :: Nil,
- Union(input1 :: input2 :: Nil)).analyze
- val expected = Project('b :: Nil,
- Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
- comparePlans(Optimize.execute(query), expected)
- }
-
// todo: add more tests for column pruning
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d563c8fa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7d60862..70b34cb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -41,6 +41,7 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
+ ColumnPruning,
CollapseProject) :: Nil
}
@@ -64,6 +65,52 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("column pruning for group") {
+ val originalQuery =
+ testRelation
+ .groupBy('a)('a, count('b))
+ .select('a)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .select('a)
+ .groupBy('a)('a).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("column pruning for group with alias") {
+ val originalQuery =
+ testRelation
+ .groupBy('a)('a as 'c, count('b))
+ .select('c)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .select('a)
+ .groupBy('a)('a as 'c).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("column pruning for Project(ne, Limit)") {
+ val originalQuery =
+ testRelation
+ .select('a, 'b)
+ .limit(2)
+ .select('a)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .select('a)
+ .limit(2).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
// After this line is unimplemented.
test("simple push down") {
val originalQuery =
@@ -557,6 +604,39 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}
+ test("push down project past sort") {
+ val x = testRelation.subquery('x)
+
+ // push down valid
+ val originalQuery = {
+ x.select('a, 'b)
+ .sortBy(SortOrder('a, Ascending))
+ .select('a)
+ }
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val correctAnswer =
+ x.select('a)
+ .sortBy(SortOrder('a, Ascending)).analyze
+
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+
+ // push down invalid
+ val originalQuery1 = {
+ x.select('a, 'b)
+ .sortBy(SortOrder('a, Ascending))
+ .select('b)
+ }
+
+ val optimized1 = Optimize.execute(originalQuery1.analyze)
+ val correctAnswer1 =
+ x.select('a, 'b)
+ .sortBy(SortOrder('a, Ascending))
+ .select('b).analyze
+
+ comparePlans(optimized1, analysis.EliminateSubqueryAliases(correctAnswer1))
+ }
+
test("push project and filter down into sample") {
val x = testRelation.subquery('x)
val originalQuery =
http://git-wip-us.apache.org/repos/asf/spark/blob/d563c8fa/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index 22d4278..4858140 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -26,8 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
@@ -64,7 +63,7 @@ private[sql] case class InMemoryRelation(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private[sql] var _statistics: Statistics = null,
private[sql] var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
- extends logical.LeafNode with MultiInstanceRelation {
+ extends LogicalPlan with MultiInstanceRelation {
override def producedAttributes: AttributeSet = outputSet
@@ -185,6 +184,8 @@ private[sql] case class InMemoryRelation(
_cachedColumnBuffers, statisticsToBePropagated, batchStats)
}
+ override def children: Seq[LogicalPlan] = Seq.empty
+
override def newInstance(): this.type = {
new InMemoryRelation(
output.map(_.newInstance()),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org