You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Anastasios Zouzias <zo...@gmail.com> on 2019/11/13 09:57:47 UTC

[Structured Streaming] Robust watermarking calculation with future timestamps

Hi all,

We currently have the following issue with a Spark Structured Streaming
(SS) application. The application reads messages from thousands of source
systems, stores them in Kafka and Spark aggregates them using SS and
watermarking (15 minutes).

The root problem is that a few of the source systems have a wrong timezone
setup that makes them emit messages from the future, i.e., +1 hour ahead of
current time (mis-configuration or winter/summer timezone change (yeah!) ).
Since watermarking is calculated as

(most latest timestamp value of all messages) - (watermarking threshold
value, 15 mins),

most of the messages are dropped due to the fact that are delayed by more
than 45 minutes. To an even more extreme scenario, even a single "future" /
adversarial message can make the structured streaming application to report
zero messages (per mini-batch).

Is there any user exposed SS API that allows a more robust calculation of
watermarking, i.e., 95th percentile of timestamps instead of max timestamp?
I understand that such calculation will be more expensive, but it will make
the application more robust.

Any suggestions/ideas?

PS. Of course the best approach would be to fix the issue on all source
systems but this might take time to do so (or perhaps drop future messages
programmatically (yikes) ).

Best regards,
Anastasios

Re: [Structured Streaming] Robust watermarking calculation with future timestamps

Posted by Jungtaek Lim <ka...@gmail.com>.
(dropping user@ as cross-posting mailing lists for mail threads would
bother both lists, and it seems more appropriate to dev@)

AFAIK there's no API for custom watermark, and you're right picking max
timestamp would introduce the issues you provided. Other streaming
frameworks may pick min timestamp by default, which also has some tradeoff,
slower advancing watermark or being stuck in skewed data.

As a workaround for now, you can adjust timestamp column before calling
withWatermark so that future events can be adjusted, though that doesn't
provide functionality like 95th percentile which requires aggregated
calculation. I guess Spark community may consider adding the feature if the
community sees more requests on this.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Wed, Nov 13, 2019 at 6:58 PM Anastasios Zouzias <zo...@gmail.com>
wrote:

> Hi all,
>
> We currently have the following issue with a Spark Structured Streaming
> (SS) application. The application reads messages from thousands of source
> systems, stores them in Kafka and Spark aggregates them using SS and
> watermarking (15 minutes).
>
> The root problem is that a few of the source systems have a wrong timezone
> setup that makes them emit messages from the future, i.e., +1 hour ahead of
> current time (mis-configuration or winter/summer timezone change (yeah!) ).
> Since watermarking is calculated as
>
> (most latest timestamp value of all messages) - (watermarking threshold
> value, 15 mins),
>
> most of the messages are dropped due to the fact that are delayed by more
> than 45 minutes. To an even more extreme scenario, even a single "future" /
> adversarial message can make the structured streaming application to report
> zero messages (per mini-batch).
>
> Is there any user exposed SS API that allows a more robust calculation of
> watermarking, i.e., 95th percentile of timestamps instead of max timestamp?
> I understand that such calculation will be more expensive, but it will make
> the application more robust.
>
> Any suggestions/ideas?
>
> PS. Of course the best approach would be to fix the issue on all source
> systems but this might take time to do so (or perhaps drop future messages
> programmatically (yikes) ).
>
> Best regards,
> Anastasios
>