You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/11 06:42:51 UTC

[spark] branch branch-3.0 updated: [SPARK-31958][SQL] normalize special floating numbers in subquery

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 593b423  [SPARK-31958][SQL] normalize special floating numbers in subquery
593b423 is described below

commit 593b42323255f44b268ca5148f1fa817f3c01de7
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jun 11 06:39:14 2020 +0000

    [SPARK-31958][SQL] normalize special floating numbers in subquery
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/23388 .
    
    https://github.com/apache/spark/pull/23388 has an issue: it doesn't handle subquery expressions and assumes they will be turned into joins. However, this is not true for non-correlated subquery expressions.
    
    This PR fixes this issue. It now doesn't skip `Subquery`, and subquery expressions will be handled by `OptimizeSubqueries`, which runs the optimizer with the subquery.
    
    Note that, correlated subquery expressions will be handled twice: once in `OptimizeSubqueries`, once later when it becomes join. This is OK as `NormalizeFloatingNumbers` is idempotent now.
    
    ### Why are the changes needed?
    
    fix a bug
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, see the newly added test.
    
    ### How was this patch tested?
    
    new test
    
    Closes #28785 from cloud-fan/normalize.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 6fb9c80da129d0b43f9ff5b8be6ce8bad992a4ed)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/optimizer/NormalizeFloatingNumbers.scala  |  4 ----
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala     | 18 ++++++++++++++++++
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
index 5f94af5..4373820 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
@@ -56,10 +56,6 @@ import org.apache.spark.sql.types._
 object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan match {
-    // A subquery will be rewritten into join later, and will go through this rule
-    // eventually. Here we skip subquery, as we only need to run this rule once.
-    case _: Subquery => plan
-
     case _ => plan transform {
       case w: Window if w.partitionSpec.exists(p => needNormalize(p)) =>
         // Although the `windowExpressions` may refer to `partitionSpec` expressions, we don't need
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a23e583..093f2db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3449,6 +3449,24 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     checkAnswer(sql("select CAST(-32768 as short) DIV CAST (-1 as short)"),
       Seq(Row(Short.MinValue.toLong * -1)))
   }
+
+  test("normalize special floating numbers in subquery") {
+    withTempView("v1", "v2", "v3") {
+      Seq(-0.0).toDF("d").createTempView("v1")
+      Seq(0.0).toDF("d").createTempView("v2")
+      spark.range(2).createTempView("v3")
+
+      // non-correlated subquery
+      checkAnswer(sql("SELECT (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d)"), Row(-0.0))
+      // correlated subquery
+      checkAnswer(
+        sql(
+          """
+            |SELECT id FROM v3 WHERE EXISTS
+            |  (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d WHERE id > 0)
+            |""".stripMargin), Row(1))
+    }
+  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org