You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Taran Saini (Jira)" <ji...@apache.org> on 2021/08/27 06:06:00 UTC

[jira] [Comment Edited] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

    [ https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405606#comment-17405606 ] 

Taran Saini edited comment on SPARK-24156 at 8/27/21, 6:05 AM:
---------------------------------------------------------------

[~kabhwan] We have a Kafka broker from where we are continuously reading via spark structured stream on which we perform some aggregations before writing it out to a file sink in APPEND mode(s3).

Watermarking is used here to accommodate for late events (15 minutes/tried with lesser values as well)
{code:java}
.withWatermark("localTimeStamp", config.getString("spark.watermarkInterval"))
{code}
The groupBy clause is used to define batch and sliding interval(15 minutes in our case) 
{code:java}
.groupBy(window($"localTimeStamp", batchInterval, config.getString("spark.slideInterval")),..,..)
{code}
Post aggregation(s), here's the snippet to write stream results to file sink : 
{code:java}
.repartition(1)
.writeStream
.partitionBy("date", "hour", "windowMinute")
.format("parquet")
.option("checkpointLocation", config.getString("spark.checkpointLocation"))
.trigger(Trigger.ProcessingTime(config.getString("spark.triggerInterval")))
.outputMode("append")
.option("path", s"${s3SinkLocation}/parquet/")
.start()
{code}
Here are the issues which we observe : 
 1. The stream doesn't write output to sink unless there is new data so basically, if no events are being fired in current window, the previous one doesn't get flushed out
 2. Even with continuous inflow of events, there is no consistency in partitioned output directories getting created every trigger interval i.e 15 mins, it works many times but not always. We did try setting `.option("parquet.block.size", 1024)` thinking it might flush events every window if the size if greater than 1024 bytes but that is also not producing desired results.

To summarise, `watermarking + append mode + file sink` is not working as expected(as it should per spark documentation). We are using Spark 3.0.x


was (Author: taransaini43):
[~kabhwan] We have a Kafka broker from where we are continuously reading via spark structured stream on which we perform some aggregations before writing it out to a file sink in APPEND mode(s3).

Watermarking is used here to accommodate for late events (15 minutes/tried with lesser values as well)
{code:java}
.withWatermark("localTimeStamp", config.getString("spark.watermarkInterval"))
{code}

The groupBy clause is used to define batch and sliding interval(15 minutes in our case) 
{code:java}
.groupBy(window($"localTimeStamp", batchInterval, config.getString("spark.slideInterval")),..,..)
{code}

Post aggregation(s), here's the snippet to write stream results to file sink : 
{code:java}
.repartition(1)
.writeStream
.partitionBy("date", "hour", "windowMinute")
.format("parquet")
.option("checkpointLocation", config.getString("spark.checkpointLocation"))
.trigger(Trigger.ProcessingTime(config.getString("spark.triggerInterval")))
.outputMode("append")
.option("path", s"${s3SinkLocation}/parquet/")
.start()
{code}


Here are the issues which we observe : 
1. The stream doesn't write output to sink unless there is new data so basically, if no events are being fired in current window, the previous one doesn't get flushed out
2. Even with continuous inflow of events, there is no consistency in partitioned output directories getting created every window i.e 15 mins, it works many times but not always. We did try setting `.option("parquet.block.size", 1024)` thinking it might flush events every window if the size if greater than 1024 bytes but that is also not producing desired results.

To summarise, `watermarking + append mode + file sink` is not working as expected(as it should per spark documentation). We are using Spark 3.0.x

> Enable no-data micro batches for more eager streaming state clean up 
> ---------------------------------------------------------------------
>
>                 Key: SPARK-24156
>                 URL: https://issues.apache.org/jira/browse/SPARK-24156
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Major
>             Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when there is new data to process. This is sensible in most cases as we dont want to unnecessarily use resources when there is nothing new to process. However, in some cases of stateful streaming queries, this delays state clean up as well as clean-up based output. For example, consider a streaming aggregation query with watermark-based state cleanup. The watermark is updated after every batch with new data completes. The updated value is used in the next batch to clean up state, and output finalized aggregates in append mode. However, if there is no data, then the next batch does not occur, and cleanup/output gets delayed unnecessarily. This is true for all stateful streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. The major challenge is that all the tests of relevant stateful operations add dummy data to force another batch for testing the state cleanup. So a lot of the tests are going to be changed. So my plan is to enable no-data batches for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org