You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/17 00:26:54 UTC
spark git commit: [SPARK-13871][SQL] Support for inferring filters
from data constraints
Repository: spark
Updated Branches:
refs/heads/master b90c0206f -> f96997ba2
[SPARK-13871][SQL] Support for inferring filters from data constraints
## What changes were proposed in this pull request?
This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things:
(a) no redundant filters are generated, and
(b) filters that do not contribute to any further optimizations are not generated.
## How was this patch tested?
Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators.
In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization.
Author: Sameer Agarwal <sa...@databricks.com>
Closes #11665 from sameeragarwal/infer-filters.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f96997ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f96997ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f96997ba
Branch: refs/heads/master
Commit: f96997ba244a14c26e85a2475415a762d0c0d0a8
Parents: b90c020
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Wed Mar 16 16:26:51 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 16:26:51 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 56 ++++-----
.../InferFiltersFromConstraintsSuite.scala | 123 +++++++++++++++++++
.../catalyst/optimizer/NullFilteringSuite.scala | 112 -----------------
3 files changed, 146 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2de92d0..76f50a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -72,6 +72,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
LimitPushDown,
ColumnPruning,
EliminateOperators,
+ InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
@@ -79,7 +80,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
- NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
@@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] {
}
/**
- * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
- * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
- * existing Filters and Join operators and are inferred based on their data constraints.
+ * Generate a list of additional filters from an operator's existing constraint but remove those
+ * that are either already part of the operator's condition or are part of the operator's child
+ * constraints. These filters are currently inserted to the existing conditions in the Filter
+ * operators and on either side of Join operators.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
-object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
+object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
- // We generate a list of additional isNotNull filters from the operator's existing constraints
- // but remove those that are either already part of the filter condition or are part of the
- // operator's child constraints.
- val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
+ val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
- if (newIsNotNullConstraints.nonEmpty) {
- Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
+ if (newFilters.nonEmpty) {
+ Filter(And(newFilters.reduce(And), condition), child)
} else {
filter
}
- case join @ Join(left, right, joinType, condition) =>
- val leftIsNotNullConstraints = join.constraints
- .filter(_.isInstanceOf[IsNotNull])
- .filter(_.references.subsetOf(left.outputSet)) -- left.constraints
- val rightIsNotNullConstraints =
- join.constraints
- .filter(_.isInstanceOf[IsNotNull])
- .filter(_.references.subsetOf(right.outputSet)) -- right.constraints
- val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
- Filter(leftIsNotNullConstraints.reduce(And), left)
- } else {
- left
- }
- val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
- Filter(rightIsNotNullConstraints.reduce(And), right)
- } else {
- right
- }
- if (newLeftChild != left || newRightChild != right) {
- Join(newLeftChild, newRightChild, joinType, condition)
- } else {
- join
+ case join @ Join(left, right, joinType, conditionOpt) =>
+ // Only consider constraints that can be pushed down completely to either the left or the
+ // right child
+ val constraints = join.constraints.filter { c =>
+ c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)}
+ // Remove those constraints that are already enforced by either the left or the right child
+ val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
+ val newConditionOpt = conditionOpt match {
+ case Some(condition) =>
+ val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
+ if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
+ case None =>
+ additionalConstraints.reduceOption(And)
}
+ if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
new file mode 100644
index 0000000..e7fdd5a
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class InferFiltersFromConstraintsSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
+ Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
+ Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("filter: filter out constraints in condition") {
+ val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
+ val correctAnswer = testRelation
+ .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join: filter out values on either side on equi-join keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y,
+ condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
+ .analyze
+ val left = x.where(IsNotNull('a) && "x.a".attr === 1)
+ val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
+ val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join: filter out nulls on either side on non equal keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y,
+ condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ .analyze
+ val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
+ val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
+ val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join with pre-existing filters: filter out values on either side") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.where('b > 5).join(y.where('a === 10),
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
+ val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
+ val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
+ val correctAnswer = left.join(right,
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single outer join: no null filters are generated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y, FullOuter,
+ condition = Some("x.a".attr === "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, originalQuery)
+ }
+
+ test("multiple inner joins: filter out values on all sides on equi-join keys") {
+ val t1 = testRelation.subquery('t1)
+ val t2 = testRelation.subquery('t2)
+ val t3 = testRelation.subquery('t3)
+ val t4 = testRelation.subquery('t4)
+
+ val originalQuery = t1.where('b > 5)
+ .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
+ .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
+ .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
+ val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
+ .join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
+ .join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
+ .join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
+ .analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("inner join with filter: filter out values on all sides on equi-join keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+
+ val originalQuery =
+ x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze
+ val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5)
+ .join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f96997ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
deleted file mode 100644
index 142e4ae..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-
-class NullFilteringSuite extends PlanTest {
-
- object Optimize extends RuleExecutor[LogicalPlan] {
- val batches = Batch("NullFiltering", Once, NullFiltering) ::
- Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
- }
-
- val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
-
- test("filter: filter out nulls in condition") {
- val originalQuery = testRelation.where('a === 1).analyze
- val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
- test("single inner join: filter out nulls on either side on equi-join keys") {
- val x = testRelation.subquery('x)
- val y = testRelation.subquery('y)
- val originalQuery = x.join(y,
- condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b))
- val right = y.where(IsNotNull('a) && IsNotNull('c))
- val correctAnswer = left.join(right,
- condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
- test("single inner join: filter out nulls on either side on non equal keys") {
- val x = testRelation.subquery('x)
- val y = testRelation.subquery('y)
- val originalQuery = x.join(y,
- condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b))
- val right = y.where(IsNotNull('a) && IsNotNull('c))
- val correctAnswer = left.join(right,
- condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
- test("single inner join with pre-existing filters: filter out nulls on either side") {
- val x = testRelation.subquery('x)
- val y = testRelation.subquery('y)
- val originalQuery = x.where('b > 5).join(y.where('c === 10),
- condition = Some("x.a".attr === "y.a".attr)).analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
- val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
- val correctAnswer = left.join(right,
- condition = Some("x.a".attr === "y.a".attr)).analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
- test("single outer join: no null filters are generated") {
- val x = testRelation.subquery('x)
- val y = testRelation.subquery('y)
- val originalQuery = x.join(y, FullOuter,
- condition = Some("x.a".attr === "y.a".attr)).analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, originalQuery)
- }
-
- test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
- val t1 = testRelation.subquery('t1)
- val t2 = testRelation.subquery('t2)
- val t3 = testRelation.subquery('t3)
- val t4 = testRelation.subquery('t4)
-
- val originalQuery = t1
- .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
- .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
- .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
- val correctAnswer = t1.where(IsNotNull('b))
- .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
- .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
- .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org