You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/09 16:07:36 UTC
spark git commit: [SPARK-25368][SQL] Incorrect predicate pushdown
returns wrong result
Repository: spark
Updated Branches:
refs/heads/master 88a930dfa -> 77c996403
[SPARK-25368][SQL] Incorrect predicate pushdown returns wrong result
## What changes were proposed in this pull request?
How to reproduce:
```scala
val df1 = spark.createDataFrame(Seq(
(1, 1)
)).toDF("a", "b").withColumn("c", lit(null).cast("int"))
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull)
df2.show
+---+---+----+---+
| a| b| c| d|
+---+---+----+---+
| 1| 1|null| 0|
| 1| 1|null| 1|
+---+---+----+---+
```
`filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before https://github.com/apache/spark/pull/19201, but it is transformed to `(c#10 = null)` since https://github.com/apache/spark/pull/20155. This pr revert it to `(null <=> c#10)` to fix this issue.
## How was this patch tested?
unit tests
Closes #22368 from wangyum/SPARK-25368.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77c99640
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77c99640
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77c99640
Branch: refs/heads/master
Commit: 77c996403d5c761f0dfea64c5b1cb7480ba1d3ac
Parents: 88a930d
Author: Yuming Wang <yu...@ebay.com>
Authored: Sun Sep 9 09:07:31 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sun Sep 9 09:07:31 2018 -0700
----------------------------------------------------------------------
.../sql/catalyst/plans/logical/LogicalPlan.scala | 2 +-
.../InferFiltersFromConstraintsSuite.scala | 2 +-
.../org/apache/spark/sql/DataFrameSuite.scala | 17 +++++++++++++++++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/77c99640/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0e4456a..5f13662 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -159,7 +159,7 @@ abstract class UnaryNode extends LogicalPlan {
var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
projectList.foreach {
case a @ Alias(l: Literal, _) =>
- allConstraints += EqualTo(a.toAttribute, l)
+ allConstraints += EqualNullSafe(a.toAttribute, l)
case a @ Alias(e, _) =>
// For every alias in `projectList`, replace the reference in constraints by its attribute.
allConstraints ++= allConstraints.map(_ transform {
http://git-wip-us.apache.org/repos/asf/spark/blob/77c99640/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index e4671f0..a40ba2d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -196,7 +196,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
test("constraints should be inferred from aliased literals") {
val originalLeft = testRelation.subquery('left).as("left")
- val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a === 2).as("left")
+ val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a <=> 2).as("left")
val right = Project(Seq(Literal(2).as("two")), testRelation.subquery('right)).as("right")
val condition = Some("left.a".attr === "right.two".attr)
http://git-wip-us.apache.org/repos/asf/spark/blob/77c99640/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 45b17b3..435b887 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2552,4 +2552,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
}
+ test("SPARK-25368 Incorrect predicate pushdown returns wrong result") {
+ def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = {
+ val df1 = spark.createDataFrame(Seq(
+ (1, 1)
+ )).toDF("a", "b").withColumn("c", newCol)
+
+ val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter(filter)
+ checkAnswer(df2, result)
+ }
+
+ check(lit(null).cast("int"), $"c".isNull, Seq(Row(1, 1, null, 0), Row(1, 1, null, 1)))
+ check(lit(null).cast("int"), $"c".isNotNull, Seq())
+ check(lit(2).cast("int"), $"c".isNull, Seq())
+ check(lit(2).cast("int"), $"c".isNotNull, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
+ check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
+ check(lit(2).cast("int"), $"c" =!= 2, Seq())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org