You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrzej Zera (Jira)" <ji...@apache.org> on 2023/10/23 16:33:00 UTC

[jira] [Updated] (SPARK-45637) Time window aggregation in separate streams followed by stream-stream join not returning results

     [ https://issues.apache.org/jira/browse/SPARK-45637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrzej Zera updated SPARK-45637:
---------------------------------
    Description: 
According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join:

[https://github.com/HeartSaVioR/spark/blob/eb0b09f0f2b518915421365a61d1f3d7d58b4404/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]

However, I failed to reproduce this example and the query I built doesn't return any results:
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
    spark    
    .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()    
    .selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
    .selectExpr("adId AS impressionAdId", "impressionTime") \
    .withWatermark("impressionTime", "10 seconds")

clicks = (  
    spark  
    .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()  
    .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 impressions result in a click  
    .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so that a click with same id as impression is generated later (i.e. delayed data).  .where("adId > 0")
) 

clicksWithWatermark = clicks \
    .selectExpr("adId AS clickAdId", "clickTime") \
    .withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(      
    window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count()

clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")

clicksAndImpressions.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions") \
    .outputMode("append") \
    .start() {code}
 

My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results:
{code:java}
clicksWindow2 = clicksWithWatermark.groupBy( 
    window(clicksWithWatermark.clickTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")

impressionsWindow2 = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")

clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner")

clicksAndImpressions2.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions2") \
    .outputMode("append") \
    .start()  {code}
 

  was:
According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join:

!image-2023-10-23-18-27-52-613.png|width=796,height=276!

However, I failed to reproduce this example and the query I built doesn't return any results:

 
{code:java}
from pyspark.sql.functions import rand
from pyspark.sql.functions import expr, window, window_time

spark.conf.set("spark.sql.shuffle.partitions", "1")

impressions = (
    spark    
    .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()    
    .selectExpr("value AS adId", "timestamp AS impressionTime")
)

impressionsWithWatermark = impressions \
    .selectExpr("adId AS impressionAdId", "impressionTime") \
    .withWatermark("impressionTime", "10 seconds")

clicks = (  
    spark  
    .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()  
    .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 impressions result in a click  
    .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so that a click with same id as impression is generated later (i.e. delayed data).  .where("adId > 0")
) 

clicksWithWatermark = clicks \
    .selectExpr("adId AS clickAdId", "clickTime") \
    .withWatermark("clickTime", "10 seconds")

clicksWindow = clicksWithWatermark.groupBy(      
    window(clicksWithWatermark.clickTime, "1 minute")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count()

clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")

clicksAndImpressions.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions") \
    .outputMode("append") \
    .start() {code}
!image-2023-10-23-18-25-12-392.png|width=379,height=71!

My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results:
{code:java}
clicksWindow2 = clicksWithWatermark.groupBy( 
    window(clicksWithWatermark.clickTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")

impressionsWindow2 = impressionsWithWatermark.groupBy(
    window(impressionsWithWatermark.impressionTime, "1 minute")
).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")

clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner")

clicksAndImpressions2.writeStream \
    .format("memory") \
    .queryName("clicksAndImpressions2") \
    .outputMode("append") \
    .start()  {code}
!image-2023-10-23-18-26-54-724.png|width=468,height=154!

 


> Time window aggregation in separate streams followed by stream-stream join not returning results
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45637
>                 URL: https://issues.apache.org/jira/browse/SPARK-45637
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.5.0
>         Environment: I'm using Spark 3.5.0 on Databricks Runtime 14.1
>            Reporter: Andrzej Zera
>            Priority: Major
>
> According to documentation update (SPARK-42591) resulting from SPARK-42376, Spark 3.5.0 should support time-window aggregations in two separate streams followed by stream-stream window join:
> [https://github.com/HeartSaVioR/spark/blob/eb0b09f0f2b518915421365a61d1f3d7d58b4404/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995]
> However, I failed to reproduce this example and the query I built doesn't return any results:
> {code:java}
> from pyspark.sql.functions import rand
> from pyspark.sql.functions import expr, window, window_time
> spark.conf.set("spark.sql.shuffle.partitions", "1")
> impressions = (
>     spark    
>     .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()    
>     .selectExpr("value AS adId", "timestamp AS impressionTime")
> )
> impressionsWithWatermark = impressions \
>     .selectExpr("adId AS impressionAdId", "impressionTime") \
>     .withWatermark("impressionTime", "10 seconds")
> clicks = (  
>     spark  
>     .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()  
>     .where((rand() * 100).cast("integer") < 10)  # 10 out of every 100 impressions result in a click  
>     .selectExpr("(value - 10) AS adId ", "timestamp AS clickTime")  # -10 so that a click with same id as impression is generated later (i.e. delayed data).  .where("adId > 0")
> ) 
> clicksWithWatermark = clicks \
>     .selectExpr("adId AS clickAdId", "clickTime") \
>     .withWatermark("clickTime", "10 seconds")
> clicksWindow = clicksWithWatermark.groupBy(      
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count()
> impressionsWindow = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count()
> clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner")
> clicksAndImpressions.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions") \
>     .outputMode("append") \
>     .start() {code}
>  
> My intuition is that I'm getting no results because to output results of the first stateful operator (time window aggregation), a watermark needs to pass the end timestamp of the window. And once the watermark is after the end timestamp of the window, this window is ignored at the second stateful operator (stream-stream) join because it's behind the watermark. Indeed, a small hack done to event time column (adding one minute) between two stateful operators makes it possible to get results:
> {code:java}
> clicksWindow2 = clicksWithWatermark.groupBy( 
>     window(clicksWithWatermark.clickTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")
> impressionsWindow2 = impressionsWithWatermark.groupBy(
>     window(impressionsWithWatermark.impressionTime, "1 minute")
> ).count().withColumn("window_time", window_time("window") + expr('INTERVAL 1 MINUTE')).drop("window")
> clicksAndImpressions2 = clicksWindow2.join(impressionsWindow2, "window_time", "inner")
> clicksAndImpressions2.writeStream \
>     .format("memory") \
>     .queryName("clicksAndImpressions2") \
>     .outputMode("append") \
>     .start()  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org