You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2016/06/01 12:54:44 UTC

Re: Maintaining watermarks per key, instead of per operator instance

Hi,
yeah, in that case per-key watermarks would be useful for you. I won't be
possible to add such a feature, though, due to the (possibly) dynamic
nature of the key space and how watermark tracking works.

You should be able to implement it with relatively low overhead using a
RichFlatMapFunction and keyed state. This is the relevant section of the
doc:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface
.

We are also in the process of improving our windowing system, especially
when it comes to late data, cleanup and trigger semantics. You can have a
look here if you're interested:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
.

Best,
Aljoscha

On Tue, 31 May 2016 at 14:36 <le...@tutanota.com> wrote:

> Hi Aljoscha,
>
> thanks for the speedy reply.
>
> I am processing measurements delivered by smart meters. I use windows to
> gather measurements and calculate values such as average consumption. The
> key is simply the meter ID.
>
> The challenge is that meters may undergo network partitioning, under which
> they fall back to local buffering. The data is then transmitted once
> connectivity has been re-established. I am using event time to obtain
> accurate calculations.
>
> If a specific meter goes offline, and the watermark progresses to the next
> window for an operator instance, then all late data will be discarded once
> that meter is online again, until it has caught up to the event time. This
> is because I am using a custom EventTimeTrigger implementation that
> discards late elements. The reason for that is because Flink would
> otherwise immediately evaluate the window upon receiving a late element,
> which is a problem since my calculations (e.g. the average consumption)
> depend on multiple elements. I cannot calculate averages with that single
> late element.
>
> Each individual meter guarantees in-order transmission of measurements. If
> watermarks progressed per key, then i would never have late elements
> because of that guarantee. I would be able to accurately calculate
> averages, with the trade-off that my results would arrive sporadically from
> the same operator instance.
>
> I suppose I could bypass the use of windows by implementing a stateful map
> function that mimics windows to a certain degree. I implemented something
> similar in Storm, but the amount of application logic required is
> substantial.
>
> I completely understand why Flink evaluates a window on a late element,
> since there is no other way to know when to evaluate the window as event
> time has already progressed.
>
> Perhaps there is a way to gather/redirect late elements?
>
> Regards
> Leon
>
> 31. May 2016 13:37 by aljoscha@apache.org:
>
>
> Hi,
> I'm afraid this is impossible with the current design of Flink. Might I
> ask what you want to achieve with this? Maybe we can come up with a
> solution.
>
> -Aljoscha
>
> On Tue, 31 May 2016 at 13:24 <le...@tutanota.com> wrote:
>
>> My use case primarily concerns applying transformations per key, with the
>> keys remaining fixed throughout the topology. I am using event time for my
>> windows.
>>
>> The problem i am currently facing is that watermarks in windows propagate
>> per operator instance, meaning the operator event time increases for all
>> keys that the operator is in charge of. I wish for watermarks to progress
>> per key, not per operator instance.
>>
>> Is this easily possible? I was unable to find an appropriate solution
>> based on existing code recipes.
>>
>> Greetings
>> Leon
>>
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think you first have to convert back to a DataStream using .select() or
.flatSelect(). But Till should know more about this, maybe he can help.

Cheers,
Aljoscha

On Thu, 2 Jun 2016 at 19:19 Kanstantsin Kamkou <kk...@gmail.com> wrote:

> Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
> The task is pretty similar, but I have to ignore once the next
> triggered event for the same key.
>
>
> On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > Hi,
> > yeah, in that case per-key watermarks would be useful for you. I won't be
> > possible to add such a feature, though, due to the (possibly) dynamic
> nature
> > of the key space and how watermark tracking works.
> >
> > You should be able to implement it with relatively low overhead using a
> > RichFlatMapFunction and keyed state. This is the relevant section of the
> > doc:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface
> .
> >
> > We are also in the process of improving our windowing system, especially
> > when it comes to late data, cleanup and trigger semantics. You can have a
> > look here if you're interested:
> >
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
> .
> >
> > Best,
> > Aljoscha
> >
> > On Tue, 31 May 2016 at 14:36 <le...@tutanota.com> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> thanks for the speedy reply.
> >>
> >> I am processing measurements delivered by smart meters. I use windows to
> >> gather measurements and calculate values such as average consumption.
> The
> >> key is simply the meter ID.
> >>
> >> The challenge is that meters may undergo network partitioning, under
> which
> >> they fall back to local buffering. The data is then transmitted once
> >> connectivity has been re-established. I am using event time to obtain
> >> accurate calculations.
> >>
> >> If a specific meter goes offline, and the watermark progresses to the
> next
> >> window for an operator instance, then all late data will be discarded
> once
> >> that meter is online again, until it has caught up to the event time.
> This
> >> is because I am using a custom EventTimeTrigger implementation that
> discards
> >> late elements. The reason for that is because Flink would otherwise
> >> immediately evaluate the window upon receiving a late element, which is
> a
> >> problem since my calculations (e.g. the average consumption) depend on
> >> multiple elements. I cannot calculate averages with that single late
> >> element.
> >>
> >> Each individual meter guarantees in-order transmission of measurements.
> If
> >> watermarks progressed per key, then i would never have late elements
> because
> >> of that guarantee. I would be able to accurately calculate averages,
> with
> >> the trade-off that my results would arrive sporadically from the same
> >> operator instance.
> >>
> >> I suppose I could bypass the use of windows by implementing a stateful
> map
> >> function that mimics windows to a certain degree. I implemented
> something
> >> similar in Storm, but the amount of application logic required is
> >> substantial.
> >>
> >> I completely understand why Flink evaluates a window on a late element,
> >> since there is no other way to know when to evaluate the window as event
> >> time has already progressed.
> >>
> >> Perhaps there is a way to gather/redirect late elements?
> >>
> >> Regards
> >> Leon
> >>
> >> 31. May 2016 13:37 by aljoscha@apache.org:
> >>
> >>
> >> Hi,
> >> I'm afraid this is impossible with the current design of Flink. Might I
> >> ask what you want to achieve with this? Maybe we can come up with a
> >> solution.
> >>
> >> -Aljoscha
> >>
> >> On Tue, 31 May 2016 at 13:24 <le...@tutanota.com> wrote:
> >>>
> >>> My use case primarily concerns applying transformations per key, with
> the
> >>> keys remaining fixed throughout the topology. I am using event time
> for my
> >>> windows.
> >>>
> >>> The problem i am currently facing is that watermarks in windows
> propagate
> >>> per operator instance, meaning the operator event time increases for
> all
> >>> keys that the operator is in charge of. I wish for watermarks to
> progress
> >>> per key, not per operator instance.
> >>>
> >>> Is this easily possible? I was unable to find an appropriate solution
> >>> based on existing code recipes.
> >>>
> >>> Greetings
> >>> Leon
>

Re: Maintaining watermarks per key, instead of per operator instance

Posted by Kanstantsin Kamkou <kk...@gmail.com>.
Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
The task is pretty similar, but I have to ignore once the next
triggered event for the same key.


On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> yeah, in that case per-key watermarks would be useful for you. I won't be
> possible to add such a feature, though, due to the (possibly) dynamic nature
> of the key space and how watermark tracking works.
>
> You should be able to implement it with relatively low overhead using a
> RichFlatMapFunction and keyed state. This is the relevant section of the
> doc:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface.
>
> We are also in the process of improving our windowing system, especially
> when it comes to late data, cleanup and trigger semantics. You can have a
> look here if you're interested:
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing.
>
> Best,
> Aljoscha
>
> On Tue, 31 May 2016 at 14:36 <le...@tutanota.com> wrote:
>>
>> Hi Aljoscha,
>>
>> thanks for the speedy reply.
>>
>> I am processing measurements delivered by smart meters. I use windows to
>> gather measurements and calculate values such as average consumption. The
>> key is simply the meter ID.
>>
>> The challenge is that meters may undergo network partitioning, under which
>> they fall back to local buffering. The data is then transmitted once
>> connectivity has been re-established. I am using event time to obtain
>> accurate calculations.
>>
>> If a specific meter goes offline, and the watermark progresses to the next
>> window for an operator instance, then all late data will be discarded once
>> that meter is online again, until it has caught up to the event time. This
>> is because I am using a custom EventTimeTrigger implementation that discards
>> late elements. The reason for that is because Flink would otherwise
>> immediately evaluate the window upon receiving a late element, which is a
>> problem since my calculations (e.g. the average consumption) depend on
>> multiple elements. I cannot calculate averages with that single late
>> element.
>>
>> Each individual meter guarantees in-order transmission of measurements. If
>> watermarks progressed per key, then i would never have late elements because
>> of that guarantee. I would be able to accurately calculate averages, with
>> the trade-off that my results would arrive sporadically from the same
>> operator instance.
>>
>> I suppose I could bypass the use of windows by implementing a stateful map
>> function that mimics windows to a certain degree. I implemented something
>> similar in Storm, but the amount of application logic required is
>> substantial.
>>
>> I completely understand why Flink evaluates a window on a late element,
>> since there is no other way to know when to evaluate the window as event
>> time has already progressed.
>>
>> Perhaps there is a way to gather/redirect late elements?
>>
>> Regards
>> Leon
>>
>> 31. May 2016 13:37 by aljoscha@apache.org:
>>
>>
>> Hi,
>> I'm afraid this is impossible with the current design of Flink. Might I
>> ask what you want to achieve with this? Maybe we can come up with a
>> solution.
>>
>> -Aljoscha
>>
>> On Tue, 31 May 2016 at 13:24 <le...@tutanota.com> wrote:
>>>
>>> My use case primarily concerns applying transformations per key, with the
>>> keys remaining fixed throughout the topology. I am using event time for my
>>> windows.
>>>
>>> The problem i am currently facing is that watermarks in windows propagate
>>> per operator instance, meaning the operator event time increases for all
>>> keys that the operator is in charge of. I wish for watermarks to progress
>>> per key, not per operator instance.
>>>
>>> Is this easily possible? I was unable to find an appropriate solution
>>> based on existing code recipes.
>>>
>>> Greetings
>>> Leon

Re: Maintaining watermarks per key, instead of per operator instance

Posted by le...@tutanota.com.
Hi again Aljoscha,

understood. Thanks for the link. I really like the straightforward approach 
concerning storing state. It makes things very easy.

The improvements are very interesting, particularly the composite triggers. 
That would significantly improve flexibility.

Kind regards
Leon

1. Jun 2016 14:54 by aljoscha@apache.org:


> Hi,> yeah, in that case per-key watermarks would be useful for you. I won't 
> be possible to add such a feature, though, due to the (possibly) dynamic 
> nature of the key space and how watermark tracking works.
> You should be able to implement it with relatively low overhead using a 
> RichFlatMapFunction and keyed state. This is the relevant section of the 
> doc: > 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface> 
> .
> We are also in the process of improving our windowing system, especially 
> when it comes to late data, cleanup and trigger semantics. You can have a 
> look here if you're interested: > 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing> 
> .
> Best,> Aljoscha
> On Tue, 31 May 2016 at 14:36 <> leon_mclare@tutanota.com> > wrote:
>
>>           >> Hi Aljoscha,
>>
>> thanks for the speedy reply.
>>
>> I am processing measurements delivered by smart meters. I use windows to 
>> gather measurements and calculate values such as average consumption. The 
>> key is simply the meter ID.
>>
>> The challenge is that meters may undergo network partitioning, under which 
>> they fall back to local buffering. The data is then transmitted once 
>> connectivity has been re-established. I am using event time to obtain 
>> accurate calculations.
>>
>> If a specific meter goes offline, and the watermark progresses to the next 
>> window for an operator instance, then all late data will be discarded once 
>> that meter is online again, until it has caught up to the event time. This 
>> is because I am using a custom EventTimeTrigger implementation that 
>> discards late elements. The reason for that is because Flink would 
>> otherwise immediately evaluate the window upon receiving a late element, 
>> which is a problem since my calculations (e.g. the average consumption) 
>> depend on multiple elements. I cannot calculate averages with that single 
>> late element.
>>
>> Each individual meter guarantees in-order transmission of measurements. If 
>> watermarks progressed per key, then i would never have late elements 
>> because of that guarantee. I would be able to accurately calculate 
>> averages, with the trade-off that my results would arrive sporadically 
>> from the same operator instance.
>>
>> I suppose I could bypass the use of windows by implementing a stateful map 
>> function that mimics windows to a certain degree. I implemented something 
>> similar in Storm, but the amount of application logic required is 
>> substantial.
>>
>> I completely understand why Flink evaluates a window on a late element, 
>> since there is no other way to know when to evaluate the window as event 
>> time has already progressed.
>>
>> Perhaps there is a way to gather/redirect late elements?
>>
>> Regards
>> Leon
>>
>> 31. May 2016 13:37 by >> aljoscha@apache.org>> :
>>
>>
>>> Hi,>>> I'm afraid this is impossible with the current design of Flink. 
>>> Might I ask what you want to achieve with this? Maybe we can come up with 
>>> a solution.
>>> -Aljoscha
>>> On Tue, 31 May 2016 at 13:24 <>>> leon_mclare@tutanota.com>>> > wrote:
>>>
>>>>           >>>> My use case primarily concerns applying transformations 
>>>> per key, with the keys remaining fixed throughout the topology. I am 
>>>> using event time for my windows.
>>>>
>>>> The problem i am currently facing is that watermarks in windows 
>>>> propagate per operator instance, meaning the operator event time 
>>>> increases for all keys that the operator is in charge of. I wish for 
>>>> watermarks to progress per key, not per operator instance.
>>>>
>>>> Is this easily possible? I was unable to find an appropriate solution 
>>>> based on existing code recipes.
>>>>
>>>> Greetings
>>>> Leon
>>>>
>>