You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "stevech.hu" <st...@outlook.com> on 2020/01/18 08:57:27 UTC

How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

We have a scenario to group raw records by correlation id every 3 minutes and
append groupped result to some HDFS store, below is an example of our query

    val df= records.readStream.format("SomeDataSource")
       .selectExpr("current_timestamp() as CurrentTime", "*")
      .withWatermark("CurrentTime", "2 minute")
      .groupBy(window($"CurrentTime", "3 minute"), $"CorrelationId")
      .agg(collect_list(col("data")) as "Records")
      .repartition(100, $"CorrelationId")
      .select($"CorrelationId", $"Records")
      .writeStream.....

We want include delayed data even if there is processing delay in the
pipeline, and have the SLA of 5 minutes meaning once any record is read into
spark, we want to see the groupped output flush to hdfs within 5 minutes.

So, let's say during shuffle stage (groupby) or write stage, we have a delay
of 5 to 10 minutes, will we lose data due to watermark of 2 minutes here?
(sometimes it is ok to break SLA but we cannot afford data loss)   If so,
how can we prevent data loss or track the amount of data is being dropped in
this case?  

Note that, extending watermark to longer windows won't work in our append
scenario, because aggregate data won't be output to write stage until the
watermark timer is up.

Thanks,
Steve


  





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

Posted by "stevech.hu" <st...@outlook.com>.
Thanks Jungtaek. I could not remove the watermark but setting 0 works for me. 




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

Posted by Jungtaek Lim <ka...@gmail.com>.
Have you try out printing timestamp for rows in each batch and watermark
while you add artificial delay on processing batch?

First of all, you're technically using "processing time" in your query,
where you will never have "late events" theoretically. Watermark is to
handle out-of-order events and you won't need it.

If Spark requires the watermark due to the technical reason, you can just
set it to 0 and any events shouldn't be lost.

> So, let's say during shuffle stage (groupby) or write stage, we have a
delay of 5 to 10 minutes, will we lose data due to watermark of 2 minutes
here?

If your batch is being delayed, the timestamp in the data will be also
delayed as the notion of "processing time". No data will be lost, but
as you're relying on processing time, the result can be affected by various
circumstances.

3 mins of window and 5 to 10 mins of batch delay would lead the grouping
only applied within a batch. Applying watermark here doesn't help the
situation but just slows down the output unnecessarily.

That's the power of "event time" processing. You'll have consistent result
even in delay, out-of-order events, etc. whereas the issue you've describe
actually applies to "event time" processing (delayed output vs discarded
late events).

Hope this helps.
Jungtaek Lim (HeartSaVioR)

On Fri, Jan 24, 2020 at 7:19 AM stevech.hu <st...@outlook.com> wrote:

> Anyone know the answers or pointers? thanks.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

Posted by "stevech.hu" <st...@outlook.com>.
Anyone know the answers or pointers? thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org