You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by PedroMrChaves <pe...@gmail.com> on 2016/11/09 16:46:50 UTC

Processing streams of events with unpredictable delays

Hello,

I have a stream source of events. Each event is assigned a timestamp by the
machine that generated the event and then those events are retreived by
other machines (collectors). Finally those collectors will send the events
to flink. In flink, when I receive those events I extract their timestamps
and process them in a windowed fashion.

The problem is that the event timestamps are unpredictable because the
collectors can fail. When a collector fails and restarts it will keep
sending the events that it didn't sent before , so those events can have a
delay of many hours or days (depending on how much time the collector was
down). 

I am trying to think of a way for processing those delayed events. As a
first approach I could allow an arbitrary lateness (when assigning
watermarks) and when an event arrives late I still can process it if it is
within the max lateness. The problem is that the collectors are very
unpredictable and I can't set an arbitrary lateness of several days because
the memory consumption would keep growing. 

So I'm trying to figure out a way to recover the events when a collector
stops and restarts. All the events that arrive to my flink job are stored in
a persistent storage. So if a collector restarts, I can retrieve the events
that belong to the same timewindow as the late events. The problem is that I
need to keep processing those late events in the same way I would if they
where arriving on time, but I don't know how can I do that with Flink or if
its even possible. 

Depicted in the figure bellow is an an example of my use case. 

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10016/EventLatenessProblem.png> 

Events A,B,C,D,E,F,G arrive on time. Then the collector fails and when it
restarts it sends
the events H,I,J,K,L,M  that where generated much earlier than the current
time.

Regards,
Pedro Chaves



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-streams-of-events-with-unpredictable-delays-tp10016.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Processing streams of events with unpredictable delays

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
in additional to what you mentioned (having a very large allowed lateness)
you can also try another approach: adding a custom operator in front of the
window operation and splitting the stream by normal elements and very late
elements. Then, in the stream of very late elements you have some custom
logic that tries and reads the result for the correct window from an
external data store and incorporates the late element into that result. You
would have to implement your own OneInputStreamOperator for this, however,
because only there can you directly deal with timestamps and watermarks.

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 17:57 PedroMrChaves <pe...@gmail.com> wrote:

> Hello,
>
> I have a stream source of events. Each event is assigned a timestamp by the
> machine that generated the event and then those events are retreived by
> other machines (collectors). Finally those collectors will send the events
> to flink. In flink, when I receive those events I extract their timestamps
> and process them in a windowed fashion.
>
> The problem is that the event timestamps are unpredictable because the
> collectors can fail. When a collector fails and restarts it will keep
> sending the events that it didn't sent before , so those events can have a
> delay of many hours or days (depending on how much time the collector was
> down).
>
> I am trying to think of a way for processing those delayed events. As a
> first approach I could allow an arbitrary lateness (when assigning
> watermarks) and when an event arrives late I still can process it if it is
> within the max lateness. The problem is that the collectors are very
> unpredictable and I can't set an arbitrary lateness of several days because
> the memory consumption would keep growing.
>
> So I'm trying to figure out a way to recover the events when a collector
> stops and restarts. All the events that arrive to my flink job are stored
> in
> a persistent storage. So if a collector restarts, I can retrieve the events
> that belong to the same timewindow as the late events. The problem is that
> I
> need to keep processing those late events in the same way I would if they
> where arriving on time, but I don't know how can I do that with Flink or if
> its even possible.
>
> Depicted in the figure bellow is an an example of my use case.
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n10016/EventLatenessProblem.png
> >
>
> Events A,B,C,D,E,F,G arrive on time. Then the collector fails and when it
> restarts it sends
> the events H,I,J,K,L,M  that where generated much earlier than the current
> time.
>
> Regards,
> Pedro Chaves
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-streams-of-events-with-unpredictable-delays-tp10016.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>