You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/02/06 02:47:44 UTC
[spark] branch master updated: [SPARK-39347][SS] Bug fix for time window calculation when event time < 0
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 87d4eb609d7 [SPARK-39347][SS] Bug fix for time window calculation when event time < 0
87d4eb609d7 is described below
commit 87d4eb609d7370d79f42cbcc5985c4fede1781d6
Author: Wei Liu <we...@databricks.com>
AuthorDate: Mon Feb 6 11:47:27 2023 +0900
[SPARK-39347][SS] Bug fix for time window calculation when event time < 0
### What changes were proposed in this pull request?
I tried to understand what was introduced in https://github.com/apache/spark/pull/36737 and made the code more readable and added some test. Many thanks to nyingping!
The change in https://github.com/apache/spark/pull/35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.
For example,
```
scala> 1 % 3
res0: Int = 1
scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:
```
* Example calculation:
* For simplicity assume windowDuration = slideDuration.
* | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
* | |----l1 ----|---- l2 -----|
* lastStart timestamp lastStartWrong
* Normally when timestamp > startTime (or equally remainder > 0), we get
* l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
* However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
* -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
* So we need to subtract a slideDuration.
```
### Why are the changes needed?
This is a bug fix.
Example from the original PR https://github.com/apache/spark/pull/36737:
Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
("1969-12-31 00:00:02", 1),
("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
(LocalDateTime.parse("1969-12-31T00:00:02"), 1),
(LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df =>
checkAnswer(
df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
.orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
)
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 == == Spark Answer - 2 ==
!struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version 10 17 14 962.7 1.0 1.0X
[info] Running benchmark: sliding windows
[info] Running case: burak version
[info] Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version 646 663 19 15.5 64.6 1.0X
```
2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info] Running case: current - buggy
[info] Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy 10 16 12 1042.7 1.0 1.0X
[info] Running benchmark: sliding windows
[info] Running case: current - buggy
[info] Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy 617 634 10 16.2 61.7 1.0X
```
3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change 10 16 11 981.2 1.0 1.0X
[info] Running benchmark: sliding windows
[info] Running case: purposed change
[info] Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change 548 562 19 18.3 54.8 1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.
Closes #39843 from WweiL/SPARK-38069-time-window-fix.
Lead-authored-by: Wei Liu <we...@databricks.com>
Co-authored-by: nieyingping <ni...@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/catalyst/analysis/ResolveTimeWindows.scala | 10 +++---
.../spark/sql/DataFrameTimeWindowingSuite.scala | 36 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index 6378f4eedd3..f3fc6c9e9db 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME}
@@ -49,7 +49,8 @@ object TimeWindowing extends Rule[LogicalPlan] {
* The windows are calculated as below:
* maxNumOverlapping <- ceil(windowDuration / slideDuration)
* for (i <- 0 until maxNumOverlapping)
- * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
+ * remainder <- (timestamp - startTime) % slideDuration
+ * lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder)
* windowStart <- lastStart - i * slideDuration
* windowEnd <- windowStart + windowDuration
* return windowStart, windowEnd
@@ -103,8 +104,9 @@ object TimeWindowing extends Rule[LogicalPlan] {
def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
- val lastStart = timestamp - (timestamp - window.startTime
- + window.slideDuration) % window.slideDuration
+ val remainder = (timestamp - window.startTime) % window.slideDuration
+ val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
+ remainder + window.slideDuration)), Some(remainder))
val windowStart = lastStart - i * window.slideDuration
val windowEnd = windowStart + window.windowDuration
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
index 0bbb9460feb..367cdbe8447 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
@@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
)
}
+
+ val df3 = Seq(
+ ("1969-12-31 00:00:02", 1),
+ ("1969-12-31 00:00:12", 2)).toDF("time", "value")
+ val df4 = Seq(
+ (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
+ (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")
+
+ Seq(df3, df4).foreach { df =>
+ checkAnswer(
+ df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
+ Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
+ )
+ }
+
+ val df5 = Seq(
+ ("1968-12-31 00:00:02", 1),
+ ("1968-12-31 00:00:12", 2)).toDF("time", "value")
+ val df6 = Seq(
+ (LocalDateTime.parse("1968-12-31T00:00:02"), 1),
+ (LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value")
+
+ Seq(df5, df6).foreach { df =>
+ checkAnswer(
+ df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
+ .orderBy($"window.start".asc)
+ .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
+ Seq(
+ Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1),
+ Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2))
+ )
+ }
}
test("multiple time windows in a single operator throws nice exception") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org