You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/23 12:37:05 UTC

[spark] branch branch-3.4 updated: [SPARK-41793][SQL] Incorrect result for window frames defined by a range clause on large decimals

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0a440e907f4 [SPARK-41793][SQL] Incorrect result for window frames defined by a range clause on large decimals
0a440e907f4 is described below

commit 0a440e907f488f3c486aa4dce5f9a486bfa84b7c
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Feb 23 20:36:16 2023 +0800

    [SPARK-41793][SQL] Incorrect result for window frames defined by a range clause on large decimals
    
    ### What changes were proposed in this pull request?
    
    Use `DecimalAddNoOverflowCheck` instead of `Add` to craete bound ordering for window range frame
    
    ### Why are the changes needed?
    
    Before 3.4, the `Add` did not check overflow. Instead, we always wrapped `Add` with a `CheckOverflow`. After https://github.com/apache/spark/pull/36698, we make `Add` check overflow by itself. However, the bound ordering of window range frame uses `Add` to calculate the boundary that is used to determine which input row lies within the frame boundaries of an output row. Then the behavior is changed with an extra overflow check.
    
    Technically,We could allow an overflowing value if it is just an intermediate result. So this pr use `DecimalAddNoOverflowCheck` to replace the `Add` to restore the previous behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, restore the previous(before 3.4) behavior
    
    ### How was this patch tested?
    
    add test
    
    Closes #40138 from ulysses-you/SPARK-41793.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit fec4f7f9aedf55709bcb40e5b504298ff4f2ccc7)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/expressions/decimalExpressions.scala    |  2 +-
 .../spark/sql/execution/window/WindowExecBase.scala  |  1 +
 .../spark/sql/DataFrameWindowFramesSuite.scala       | 20 +++++++++++++++++++-
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
index 37e3dd5ea89..01b6e81b3cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
@@ -213,7 +213,7 @@ case class CheckOverflowInSum(
 }
 
 /**
- * An add expression for decimal values which is only used internally by Sum/Avg.
+ * An add expression for decimal values which is only used internally by Sum/Avg/Window.
  *
  * Nota that, this expression does not check overflow which is different with `Add`. When
  * aggregating values, Spark writes the aggregation buffer values to `UnsafeRow` via
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
index 0f19f14576b..44181c79bce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
@@ -128,6 +128,7 @@ trait WindowExecBase extends UnaryExecNode {
             TimestampAddYMInterval(expr, boundOffset, Some(timeZone))
           case (TimestampType | TimestampNTZType, _: DayTimeIntervalType) =>
             TimeAdd(expr, boundOffset, Some(timeZone))
+          case (d: DecimalType, _: DecimalType) => DecimalAddNoOverflowCheck(expr, boundOffset, d)
           case (a, b) if a == b => Add(expr, boundOffset)
         }
         val bound = MutableProjection.create(boundExpr :: Nil, child.output)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
index ea8cfc7b81a..48a3d740559 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFramesSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, NonFoldableLiteral, RangeFrame, SortOrder, SpecifiedWindowFrame, UnspecifiedFrame}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, NonFoldableLiteral, RangeFrame, SortOrder, SpecifiedWindowFrame, UnaryMinus, UnspecifiedFrame}
 import org.apache.spark.sql.catalyst.plans.logical.{Window => WindowNode}
 import org.apache.spark.sql.expressions.{Window, WindowSpec}
 import org.apache.spark.sql.functions._
@@ -474,4 +474,22 @@ class DataFrameWindowFramesSuite extends QueryTest with SharedSparkSession {
     checkAnswer(df,
       Row(3, 1.5) :: Row(3, 1.5) :: Row(6, 2.0) :: Row(6, 2.0) :: Row(6, 2.0) :: Nil)
   }
+
+  test("SPARK-41793: Incorrect result for window frames defined by a range clause on large " +
+    "decimals") {
+    val window = new WindowSpec(Seq($"a".expr), Seq(SortOrder($"b".expr, Ascending)),
+      SpecifiedWindowFrame(RangeFrame,
+        UnaryMinus(Literal(BigDecimal(10.2345))), Literal(BigDecimal(6.7890))))
+
+    val df = Seq(
+      1 -> "11342371013783243717493546650944543.47",
+      1 -> "999999999999999999999999999999999999.99"
+    ).toDF("a", "b")
+      .select($"a", $"b".cast("decimal(38, 2)"))
+      .select(count("*").over(window))
+
+    checkAnswer(
+      df,
+      Row(1) :: Row(1) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org