You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/02/06 02:47:44 UTC

[spark] branch master updated: [SPARK-39347][SS] Bug fix for time window calculation when event time < 0

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d4eb609d7 [SPARK-39347][SS] Bug fix for time window calculation when event time < 0
87d4eb609d7 is described below

commit 87d4eb609d7370d79f42cbcc5985c4fede1781d6
Author: Wei Liu <we...@databricks.com>
AuthorDate: Mon Feb 6 11:47:27 2023 +0900

    [SPARK-39347][SS] Bug fix for time window calculation when event time < 0
    
    ### What changes were proposed in this pull request?
    
    I tried to understand what was introduced in https://github.com/apache/spark/pull/36737 and made the code more readable and added some test. Many thanks to nyingping!
    
    The change in https://github.com/apache/spark/pull/35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.
    
    For example,
    
    ```
    scala> 1 % 3
    res0: Int = 1
    
    scala> -1 % 3
    res1: Int = -1 // Mathematically it should be 2 here
    ```
    This lead to a wrong calculation result of `windowStart`. For a concrete example:
    
    ```
    * Example calculation:
       * For simplicity assume windowDuration = slideDuration.
       * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
       * |                         |----l1 ----|---- l2 -----|
       *                        lastStart   timestamp   lastStartWrong
       * Normally when timestamp > startTime (or equally remainder > 0), we get
       * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
       * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
       * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
       * So we need to subtract a slideDuration.
    ```
    
    ### Why are the changes needed?
    
    This is a bug fix.
    
    Example from the original PR https://github.com/apache/spark/pull/36737:
    
    Here df3 and df4 has time before 1970, so timestamp < 0.
    ```
    val df3 = Seq(
          ("1969-12-31 00:00:02", 1),
          ("1969-12-31 00:00:12", 2)).toDF("time", "value")
    val df4 = Seq(
          (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
          (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
          checkAnswer(
            df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
              .orderBy($"window.start".asc)
              .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
            Seq(
              Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
              Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
          )
    }
    ```
    Without the change this would error with:
    ```
    == Results ==
    !== Correct Answer - 2 ==                      == Spark Answer - 2 ==
    !struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
    ![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
    ![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
    ```
    Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    ### Benchmark results:
    1. Burak's original Implementation
    ```
    [info] Apple M1 Max
    [info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] burak version                                        10             17          14        962.7           1.0       1.0X
    [info] Running benchmark: sliding windows
    [info]   Running case: burak version
    [info]   Stopped after 16 iterations, 10604 ms
    [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
    [info] Apple M1 Max
    [info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] burak version                                       646            663          19         15.5          64.6       1.0X
    ```
    
    2. Current implementation (buggy)
    ```
    [info] Running benchmark: tumbling windows
    [info]   Running case: current - buggy
    [info]   Stopped after 637 iterations, 10008 ms
    [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
    [info] Apple M1 Max
    [info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
    [info] Running benchmark: sliding windows
    [info]   Running case: current - buggy
    [info]   Stopped after 16 iterations, 10143 ms
    [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
    [info] Apple M1 Max
    [info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] current - buggy                                     617            634          10         16.2          61.7       1.0X
    ```
    
    3. Purposed change in this PR:
    ```
    [info] Apple M1 Max
    [info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] purposed change                                      10             16          11        981.2           1.0       1.0X
    [info] Running benchmark: sliding windows
    [info]   Running case: purposed change
    [info]   Stopped after 18 iterations, 10122 ms
    [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
    [info] Apple M1 Max
    [info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] purposed change                                     548            562          19         18.3          54.8       1.0X
    ```
    Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.
    
    Closes #39843 from WweiL/SPARK-38069-time-window-fix.
    
    Lead-authored-by: Wei Liu <we...@databricks.com>
    Co-authored-by: nieyingping <ni...@alphadata.com.cn>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/catalyst/analysis/ResolveTimeWindows.scala | 10 +++---
 .../spark/sql/DataFrameTimeWindowingSuite.scala    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index 6378f4eedd3..f3fc6c9e9db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
 import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME}
@@ -49,7 +49,8 @@ object TimeWindowing extends Rule[LogicalPlan] {
    * The windows are calculated as below:
    * maxNumOverlapping <- ceil(windowDuration / slideDuration)
    * for (i <- 0 until maxNumOverlapping)
-   *   lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
+   *   remainder <- (timestamp - startTime) % slideDuration
+   *   lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder)
    *   windowStart <- lastStart - i * slideDuration
    *   windowEnd <- windowStart + windowDuration
    *   return windowStart, windowEnd
@@ -103,8 +104,9 @@ object TimeWindowing extends Rule[LogicalPlan] {
 
         def getWindow(i: Int, dataType: DataType): Expression = {
           val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
-          val lastStart = timestamp - (timestamp - window.startTime
-            + window.slideDuration) % window.slideDuration
+          val remainder = (timestamp - window.startTime) % window.slideDuration
+          val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
+            remainder + window.slideDuration)), Some(remainder))
           val windowStart = lastStart - i * window.slideDuration
           val windowEnd = windowStart + window.windowDuration
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index 0bbb9460feb..367cdbe8447 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
           Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
       )
     }
+
+    val df3 = Seq(
+      ("1969-12-31 00:00:02", 1),
+      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
+    val df4 = Seq(
+      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
+      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
+
+    Seq(df3, df4).foreach { df =>
+      checkAnswer(
+        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
+          .orderBy($"window.start".asc)
+          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+        Seq(
+          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
+          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
+      )
+    }
+
+    val df5 = Seq(
+      ("1968-12-31 00:00:02", 1),
+      ("1968-12-31 00:00:12", 2)).toDF("time", "value")
+    val df6 = Seq(
+      (LocalDateTime.parse("1968-12-31T00:00:02"), 1),
+      (LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value")
+
+    Seq(df5, df6).foreach { df =>
+      checkAnswer(
+        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
+          .orderBy($"window.start".asc)
+          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+        Seq(
+          Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1),
+          Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2))
+      )
+    }
   }
 
   test("multiple time windows in a single operator throws nice exception") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org