You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/17 16:05:35 UTC
spark git commit: [SPARK-23079][SQL] Fix query constraints
propagation with aliases
Repository: spark
Updated Branches:
refs/heads/master 0f8a28617 -> 8598a982b
[SPARK-23079][SQL] Fix query constraints propagation with aliases
## What changes were proposed in this pull request?
Previously, PR #19201 fix the problem of non-converging constraints.
After that PR #19149 improve the loop and constraints is inferred only once.
So the problem of non-converging constraints is gone.
However, the case below will fail.
```
spark.range(5).write.saveAsTable("t")
val t = spark.read.table("t")
val left = t.withColumn("xid", $"id" + lit(1)).as("x")
val right = t.withColumnRenamed("id", "xid").as("y")
val df = left.join(right, "xid").filter("id = 3").toDF()
checkAnswer(df, Row(4, 3))
```
Because `aliasMap` replace all the aliased child. See the test case in PR for details.
This PR is to fix this bug by removing useless code for preventing non-converging constraints.
It can be also fixed with #20270, but this is much simpler and clean up the code.
## How was this patch tested?
Unit test
Author: Wang Gengliang <lt...@gmail.com>
Closes #20278 from gengliangwang/FixConstraintSimple.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8598a982
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8598a982
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8598a982
Branch: refs/heads/master
Commit: 8598a982b4147abe5f1aae005fea0fd5ae395ac4
Parents: 0f8a286
Author: Wang Gengliang <lt...@gmail.com>
Authored: Thu Jan 18 00:05:26 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jan 18 00:05:26 2018 +0800
----------------------------------------------------------------------
.../catalyst/plans/logical/LogicalPlan.scala | 1 +
.../plans/logical/QueryPlanConstraints.scala | 37 +-----------
.../InferFiltersFromConstraintsSuite.scala | 59 +-------------------
.../plans/ConstraintPropagationSuite.scala | 2 +
.../org/apache/spark/sql/SQLQuerySuite.scala | 11 ++++
5 files changed, 17 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8598a982/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index ff2a0ec..c8ccd9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -255,6 +255,7 @@ abstract class UnaryNode extends LogicalPlan {
case expr: Expression if expr.semanticEquals(e) =>
a.toAttribute
})
+ allConstraints += EqualNullSafe(e, a.toAttribute)
case _ => // Don't change.
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8598a982/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index 9c0a30a..5c7b8e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -94,25 +94,16 @@ trait QueryPlanConstraints { self: LogicalPlan =>
case _ => Seq.empty[Attribute]
}
- // Collect aliases from expressions of the whole tree rooted by the current QueryPlan node, so
- // we may avoid producing recursive constraints.
- private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
- expressions.collect {
- case a: Alias if !a.child.isInstanceOf[Literal] => (a.toAttribute, a.child)
- } ++ children.flatMap(_.asInstanceOf[QueryPlanConstraints].aliasMap))
- // Note: the explicit cast is necessary, since Scala compiler fails to infer the type.
-
/**
* Infers an additional set of constraints from a given set of equality constraints.
* For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
* additional constraint of the form `b = 5`.
*/
private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
- val aliasedConstraints = eliminateAliasedExpressionInConstraints(constraints)
var inferredConstraints = Set.empty[Expression]
- aliasedConstraints.foreach {
+ constraints.foreach {
case eq @ EqualTo(l: Attribute, r: Attribute) =>
- val candidateConstraints = aliasedConstraints - eq
+ val candidateConstraints = constraints - eq
inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
case _ => // No inference
@@ -120,30 +111,6 @@ trait QueryPlanConstraints { self: LogicalPlan =>
inferredConstraints -- constraints
}
- /**
- * Replace the aliased expression in [[Alias]] with the alias name if both exist in constraints.
- * Thus non-converging inference can be prevented.
- * E.g. `Alias(b, f(a)), a = b` infers `f(a) = f(f(a))` without eliminating aliased expressions.
- * Also, the size of constraints is reduced without losing any information.
- * When the inferred filters are pushed down the operators that generate the alias,
- * the alias names used in filters are replaced by the aliased expressions.
- */
- private def eliminateAliasedExpressionInConstraints(constraints: Set[Expression])
- : Set[Expression] = {
- val attributesInEqualTo = constraints.flatMap {
- case EqualTo(l: Attribute, r: Attribute) => l :: r :: Nil
- case _ => Nil
- }
- var aliasedConstraints = constraints
- attributesInEqualTo.foreach { a =>
- if (aliasMap.contains(a)) {
- val child = aliasMap.get(a).get
- aliasedConstraints = replaceConstraints(aliasedConstraints, child, a)
- }
- }
- aliasedConstraints
- }
-
private def replaceConstraints(
constraints: Set[Expression],
source: Expression,
http://git-wip-us.apache.org/repos/asf/spark/blob/8598a982/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index a0708bf..178c4b8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -34,6 +34,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
PushDownPredicate,
InferFiltersFromConstraints,
CombineFilters,
+ SimplifyBinaryComparison,
BooleanSimplification) :: Nil
}
@@ -160,64 +161,6 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
- test("inner join with alias: don't generate constraints for recursive functions") {
- val t1 = testRelation.subquery('t1)
- val t2 = testRelation.subquery('t2)
-
- // We should prevent `Coalese(a, b)` from recursively creating complicated constraints through
- // the constraint inference procedure.
- val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
- // We hide an `Alias` inside the child's child's expressions, to cover the situation reported
- // in [SPARK-20700].
- .select('int_col, 'd, 'a).as("t")
- .join(t2, Inner,
- Some("t.a".attr === "t2.a".attr
- && "t.d".attr === "t2.a".attr
- && "t.int_col".attr === "t2.a".attr))
- .analyze
- val correctAnswer = t1
- .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a))) && IsNotNull(Coalesce(Seq('b, 'a)))
- && IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b))) && IsNotNull(Coalesce(Seq('a, 'b)))
- && 'a === 'b && 'a === Coalesce(Seq('a, 'a)) && 'a === Coalesce(Seq('a, 'b))
- && 'a === Coalesce(Seq('b, 'a)) && 'b === Coalesce(Seq('a, 'b))
- && 'b === Coalesce(Seq('b, 'a)) && 'b === Coalesce(Seq('b, 'b)))
- .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
- .select('int_col, 'd, 'a).as("t")
- .join(
- t2.where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a))) &&
- 'a === Coalesce(Seq('a, 'a))),
- Inner,
- Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr
- && "t.int_col".attr === "t2.a".attr))
- .analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
- test("inner join with EqualTo expressions containing part of each other: don't generate " +
- "constraints for recursive functions") {
- val t1 = testRelation.subquery('t1)
- val t2 = testRelation.subquery('t2)
-
- // We should prevent `c = Coalese(a, b)` and `a = Coalese(b, c)` from recursively creating
- // complicated constraints through the constraint inference procedure.
- val originalQuery = t1
- .select('a, 'b, 'c, Coalesce(Seq('b, 'c)).as('d), Coalesce(Seq('a, 'b)).as('e))
- .where('a === 'd && 'c === 'e)
- .join(t2, Inner, Some("t1.a".attr === "t2.a".attr && "t1.c".attr === "t2.c".attr))
- .analyze
- val correctAnswer = t1
- .where(IsNotNull('a) && IsNotNull('c) && 'a === Coalesce(Seq('b, 'c)) &&
- 'c === Coalesce(Seq('a, 'b)))
- .select('a, 'b, 'c, Coalesce(Seq('b, 'c)).as('d), Coalesce(Seq('a, 'b)).as('e))
- .join(t2.where(IsNotNull('a) && IsNotNull('c)),
- Inner,
- Some("t1.a".attr === "t2.a".attr && "t1.c".attr === "t2.c".attr))
- .analyze
- val optimized = Optimize.execute(originalQuery)
- comparePlans(optimized, correctAnswer)
- }
-
test("generate correct filters for alias that don't produce recursive constraints") {
val t1 = testRelation.subquery('t1)
http://git-wip-us.apache.org/repos/asf/spark/blob/8598a982/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 866ff0d..a37e06d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -134,6 +134,8 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest {
verifyConstraints(aliasedRelation.analyze.constraints,
ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10,
IsNotNull(resolveColumn(aliasedRelation.analyze, "x")),
+ resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"),
+ resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"),
resolveColumn(aliasedRelation.analyze, "z") > 10,
IsNotNull(resolveColumn(aliasedRelation.analyze, "z")))))
http://git-wip-us.apache.org/repos/asf/spark/blob/8598a982/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 7c9840a..d4d0aa4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2717,6 +2717,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("SPARK-23079: constraints should be inferred correctly with aliases") {
+ withTable("t") {
+ spark.range(5).write.saveAsTable("t")
+ val t = spark.read.table("t")
+ val left = t.withColumn("xid", $"id" + lit(1)).as("x")
+ val right = t.withColumnRenamed("id", "xid").as("y")
+ val df = left.join(right, "xid").filter("id = 3").toDF()
+ checkAnswer(df, Row(4, 3))
+ }
+ }
+
test("SRARK-22266: the same aggregate function was calculated multiple times") {
val query = "SELECT a, max(b+1), max(b+1) + 1 FROM testData2 GROUP BY a"
val df = sql(query)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org