You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "nyingping (Jira)" <ji...@apache.org> on 2022/05/31 09:45:00 UTC

[jira] [Updated] (SPARK-39347) Generate wrong time window when time <0 && abs(time) > window.slideDuration

     [ https://issues.apache.org/jira/browse/SPARK-39347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

nyingping updated SPARK-39347:
------------------------------
    Description: 
h2. Bug description

Since the generation strategy of the sliding window in PR [#35362| [[SPARK-38069][SQL][SS] Improve the calculation of time window by nyingping · Pull Request #35362 · apache/spark (github.com)|https://github.com/apache/spark/pull/35362]]is changed to the current one, a new problem will arise.

When the record data time to be processed is negative and the absolute value of this time is greater than the length of the window, a window generation error will occur.

 

Because the previous test cases were not fully covered, for example, in the test case [dataframetimewindowingsuite.scala#negative timestamps|#negative timestamps]:

 
{code:java}
val df1 = Seq(
  ("1970-01-01 00:00:02", 1),
  ("1970-01-01 00:00:12", 2)).toDF("time", "value")
val df2 = Seq(
  (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
  (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")

Seq(df1, df2).foreach { df =>
  checkAnswer(
    df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
      .orderBy($"window.start".asc)
      .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
    Seq(
      Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
      Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
  )
} {code}
 

 

In this test case, the original data timestamp "1970-01-01 00:00:02" is greater than 0, and the absolute value of the generated window start time "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test passes normally.

 

But, a problem occurs when a timestamp is an age and the absolute value is greater than the window length.

 
{code:java}
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
    val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
} {code}
run and get result:

 

 
{code:java}
 == Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] {code}
 

 
h2. fix

Since this was caused by my PR, I am extremely sorry and I will fix this bug.

 
h2. performance

benchmark result:

[oldlogic#18364|https://github.com/apache/spark/pull/18364]  VS 【fix version】
{code:java}
Running benchmark: tumbling windows
  Running case: old logic
  Stopped after 407 iterations, 10012 ms
  Running case: new logic
  Stopped after 615 iterations, 10007 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                            17             25           9        580.1           1.7       1.0X
new logic                                            15             16           2        680.8           1.5       1.2X
Running benchmark: sliding windows
  Running case: old logic
  Stopped after 10 iterations, 10296 ms
  Running case: new logic
  Stopped after 15 iterations, 10391 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                          1000           1030          19         10.0         100.0       1.0X
new logic                                           668            693          21         15.0          66.8       1.5X
  {code}
Fixed version than PR [# 35362 | loss is about 0.1 the performance of the lost.

 

  was:
h2. Bug description

Since the generation strategy of the sliding window in PR [#35362|[[SPARK-38069][SQL][SS] Improve the calculation of time window by nyingping · Pull Request #35362 · apache/spark (github.com)|https://github.com/apache/spark/pull/35362]] is changed to the current one, a new problem will arise.

When the record data time to be processed is negative and the absolute value of this time is greater than the length of the window, a window generation error will occur.

 

Because the previous test cases were not fully covered, for example, in the test case [dataframetimewindowingsuite.scala#negative timestamps]:

 
{code:java}
val df1 = Seq(
  ("1970-01-01 00:00:02", 1),
  ("1970-01-01 00:00:12", 2)).toDF("time", "value")
val df2 = Seq(
  (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
  (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")

Seq(df1, df2).foreach { df =>
  checkAnswer(
    df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
      .orderBy($"window.start".asc)
      .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
    Seq(
      Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
      Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
  )
} {code}
 

 

In this test case, the original data timestamp "1970-01-01 00:00:02" is greater than 0, and the absolute value of the generated window start time "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test passes normally.

 

But, a problem occurs when a timestamp is an age and the absolute value is greater than the window length.

 
{code:java}
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
    val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
} {code}
run and get result:

 

 
{code:java}
 == Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] {code}
 

 
h2. fix

Since this was caused by my PR, I am extremely sorry and I will fix this bug.

 
h2. performance

benchmark result:

[oldlogic#18364|https://github.com/apache/spark/pull/18364]  VS 【fix version】
{code:java}
Running benchmark: tumbling windows
  Running case: old logic
  Stopped after 407 iterations, 10012 ms
  Running case: new logic
  Stopped after 615 iterations, 10007 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                            17             25           9        580.1           1.7       1.0X
new logic                                            15             16           2        680.8           1.5       1.2X
Running benchmark: sliding windows
  Running case: old logic
  Stopped after 10 iterations, 10296 ms
  Running case: new logic
  Stopped after 15 iterations, 10391 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                          1000           1030          19         10.0         100.0       1.0X
new logic                                           668            693          21         15.0          66.8       1.5X
  {code}
Fixed version than PR [# 35362 | loss is about 0.1 the performance of the lost.

 

        Summary: Generate wrong time window when time <0 && abs(time) > window.slideDuration  (was: Generate wrong window when time <0 && abs(time) > window.slideDuration)

> Generate wrong time window when time <0 && abs(time) > window.slideDuration
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-39347
>                 URL: https://issues.apache.org/jira/browse/SPARK-39347
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.3.0
>            Reporter: nyingping
>            Priority: Major
>
> h2. Bug description
> Since the generation strategy of the sliding window in PR [#35362| [[SPARK-38069][SQL][SS] Improve the calculation of time window by nyingping · Pull Request #35362 · apache/spark (github.com)|https://github.com/apache/spark/pull/35362]]is changed to the current one, a new problem will arise.
> When the record data time to be processed is negative and the absolute value of this time is greater than the length of the window, a window generation error will occur.
>  
> Because the previous test cases were not fully covered, for example, in the test case [dataframetimewindowingsuite.scala#negative timestamps|#negative timestamps]:
>  
> {code:java}
> val df1 = Seq(
>   ("1970-01-01 00:00:02", 1),
>   ("1970-01-01 00:00:12", 2)).toDF("time", "value")
> val df2 = Seq(
>   (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
>   (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")
> Seq(df1, df2).foreach { df =>
>   checkAnswer(
>     df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
>       .orderBy($"window.start".asc)
>       .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
>     Seq(
>       Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
>       Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
>   )
> } {code}
>  
>  
> In this test case, the original data timestamp "1970-01-01 00:00:02" is greater than 0, and the absolute value of the generated window start time "1969-12-31 23:59:55" is less than the window size "10 seconds", so the test passes normally.
>  
> But, a problem occurs when a timestamp is an age and the absolute value is greater than the window length.
>  
> {code:java}
> val df3 = Seq(
>       ("1969-12-31 00:00:02", 1),
>       ("1969-12-31 00:00:12", 2)).toDF("time", "value")
>     val df4 = Seq(
>       (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
>       (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
>       checkAnswer(
>         df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
>           .orderBy($"window.start".asc)
>           .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
>         Seq(
>           Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
>           Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
>       )
> } {code}
> run and get result:
>  
>  
> {code:java}
>  == Results ==
> !== Correct Answer - 2 ==                      == Spark Answer - 2 ==
> !struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
> ![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] {code}
>  
>  
> h2. fix
> Since this was caused by my PR, I am extremely sorry and I will fix this bug.
>  
> h2. performance
> benchmark result:
> [oldlogic#18364|https://github.com/apache/spark/pull/18364]  VS 【fix version】
> {code:java}
> Running benchmark: tumbling windows
>   Running case: old logic
>   Stopped after 407 iterations, 10012 ms
>   Running case: new logic
>   Stopped after 615 iterations, 10007 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic                                            17             25           9        580.1           1.7       1.0X
> new logic                                            15             16           2        680.8           1.5       1.2X
> Running benchmark: sliding windows
>   Running case: old logic
>   Stopped after 10 iterations, 10296 ms
>   Running case: new logic
>   Stopped after 15 iterations, 10391 ms
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
> Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
> ------------------------------------------------------------------------------------------------------------------------
> old logic                                          1000           1030          19         10.0         100.0       1.0X
> new logic                                           668            693          21         15.0          66.8       1.5X
>   {code}
> Fixed version than PR [# 35362 | loss is about 0.1 the performance of the lost.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org