You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephen Connolly <st...@gmail.com> on 2019/02/10 19:55:48 UTC

Is there a windowing strategy that allows a different offset per key?

I would like to process a stream of data firom different customers,
producing output say once every 15 minutes. The results will then be loaded
into another system for stoage and querying.

I have been using TumblingEventTimeWindows in my prototype, but I am
concerned that all the windows will start and stop at the same time and
cause batch load effects on the back-end data store.

What I think I would like is that the windows could have a different start
offset for each key, (using a hash function that I would supply)

Thus deterministically, key "ca:fe:ba:be" would always start based on an
initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
based on an initial offset of say 00:02 UTC

Is this possible? Or do I just have to find some way of queuing up my
writes using back-pressure?

Thanks in advance

-stephenc

P.S. I can trade assistance with Flink for assistance with Maven or Jenkins
if my questions are too wierysome!

Re: Is there a windowing strategy that allows a different offset per key?

Posted by Rong Rong <wa...@gmail.com>.
getKey(IN value)Hi Stephen,

Yes, we had a discussion regarding for dynamic offsets and keys [1]. The
main idea was the same: we don't have many complex operators after the
window operator, thus a huge spike of traffic will occur after firing on
the window boundary. In the discussion the best idea is to override with a
special *WindowAssigner*, in which you can customize the offset strategy.

The only thing is that the *KeySelector* you use before windowing have to
be stateless (e.g. every invoke of *getKey(IN value)* function with same
input value should return identical result). In your case, if the id field
is used to determine the offset, you can always do that by extracting id
from the key tuple of (id, path).

Hope these helps.

Thanks,
Rong


[1]
https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing

On Mon, Feb 11, 2019 at 2:20 AM Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

>
>
> On Mon, 11 Feb 2019 at 09:54, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Stephen,
>>
>> First of all, yes, windows computing and emitting at the same time can
>> cause pressure on the downstream system.
>>
>> There are a few ways how you can achieve this:
>> * use a custom window assigner. A window assigner decides into which
>> window a record is assigned. This is the approach you suggested.
>>
>
> Thanks for the link. Yes I think the custom window assigner is most
> certainly the way to go for my use case. Even more specifically because the
> offsets I want to use are going to be based on a subset of the assigned key
> not the full assigned key (if you see my other mails this week, the key I
> window is a composite key of (id,path) but I want to have all the offsets
> for any specific id be the same, irrespective of the path, so the
> theoretical need of access to the full key that was driving Rong's original
> idea for an RFE to the WindowAssignerContext is not even necessary for my
> case)
>
>
>> * use a regular window and add an operator that buffers the window
>> results and releases them with randomized delay.
>> * use a ProcessFunction which allows you to control the timing of
>> computations yourself.
>>
>> A few months ago, there was a similar discussion on the dev mailing list
>> [1] (didn't read the thread) started by Rong (in CC).
>> Maybe, he can share some ideas / experiences as well.
>>
>
> Would be awesome if Rong can share any learnings he has encountered since
>
>
>>
>> Cheers,
>> Fabian
>>
>> [1]
>> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>>
>>
>> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
>> stephen.alan.connolly@gmail.com>:
>>
>>> Looking into the code in TumblingEventTimeWindows:
>>>
>>> @Override
>>> public Collection<TimeWindow> assignWindows(Object element, long
>>> timestamp, WindowAssignerContext context) {
>>> if (timestamp > Long.MIN_VALUE) {
>>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset,
>>> size);
>>> return Collections.singletonList(new TimeWindow(start, start + size));
>>> } else {
>>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>>> timestamp marker). " +
>>> "Is the time characteristic set to 'ProcessingTime', or did you forget
>>> to call " +
>>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>>> }
>>> }
>>>
>>> So I think I can just write my own where the offset is derived from
>>> hashing the element using my hash function.
>>>
>>> Good plan or bad plan?
>>>
>>>
>>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>>> stephen.alan.connolly@gmail.com> wrote:
>>>
>>>> I would like to process a stream of data firom different customers,
>>>> producing output say once every 15 minutes. The results will then be loaded
>>>> into another system for stoage and querying.
>>>>
>>>> I have been using TumblingEventTimeWindows in my prototype, but I am
>>>> concerned that all the windows will start and stop at the same time and
>>>> cause batch load effects on the back-end data store.
>>>>
>>>> What I think I would like is that the windows could have a different
>>>> start offset for each key, (using a hash function that I would supply)
>>>>
>>>> Thus deterministically, key "ca:fe:ba:be" would always start based on
>>>> an initail offset of 00:07 UTC while say key "de:ad:be:ef" would always
>>>> start based on an initial offset of say 00:02 UTC
>>>>
>>>> Is this possible? Or do I just have to find some way of queuing up my
>>>> writes using back-pressure?
>>>>
>>>> Thanks in advance
>>>>
>>>> -stephenc
>>>>
>>>> P.S. I can trade assistance with Flink for assistance with Maven or
>>>> Jenkins if my questions are too wierysome!
>>>>
>>>

Re: Is there a windowing strategy that allows a different offset per key?

Posted by Stephen Connolly <st...@gmail.com>.
On Mon, 11 Feb 2019 at 09:54, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Stephen,
>
> First of all, yes, windows computing and emitting at the same time can
> cause pressure on the downstream system.
>
> There are a few ways how you can achieve this:
> * use a custom window assigner. A window assigner decides into which
> window a record is assigned. This is the approach you suggested.
>

Thanks for the link. Yes I think the custom window assigner is most
certainly the way to go for my use case. Even more specifically because the
offsets I want to use are going to be based on a subset of the assigned key
not the full assigned key (if you see my other mails this week, the key I
window is a composite key of (id,path) but I want to have all the offsets
for any specific id be the same, irrespective of the path, so the
theoretical need of access to the full key that was driving Rong's original
idea for an RFE to the WindowAssignerContext is not even necessary for my
case)


> * use a regular window and add an operator that buffers the window results
> and releases them with randomized delay.
> * use a ProcessFunction which allows you to control the timing of
> computations yourself.
>
> A few months ago, there was a similar discussion on the dev mailing list
> [1] (didn't read the thread) started by Rong (in CC).
> Maybe, he can share some ideas / experiences as well.
>

Would be awesome if Rong can share any learnings he has encountered since


>
> Cheers,
> Fabian
>
> [1]
> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>
>
> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
> stephen.alan.connolly@gmail.com>:
>
>> Looking into the code in TumblingEventTimeWindows:
>>
>> @Override
>> public Collection<TimeWindow> assignWindows(Object element, long
>> timestamp, WindowAssignerContext context) {
>> if (timestamp > Long.MIN_VALUE) {
>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
>> return Collections.singletonList(new TimeWindow(start, start + size));
>> } else {
>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). " +
>> "Is the time characteristic set to 'ProcessingTime', or did you forget to
>> call " +
>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>> }
>> }
>>
>> So I think I can just write my own where the offset is derived from
>> hashing the element using my hash function.
>>
>> Good plan or bad plan?
>>
>>
>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>> stephen.alan.connolly@gmail.com> wrote:
>>
>>> I would like to process a stream of data firom different customers,
>>> producing output say once every 15 minutes. The results will then be loaded
>>> into another system for stoage and querying.
>>>
>>> I have been using TumblingEventTimeWindows in my prototype, but I am
>>> concerned that all the windows will start and stop at the same time and
>>> cause batch load effects on the back-end data store.
>>>
>>> What I think I would like is that the windows could have a different
>>> start offset for each key, (using a hash function that I would supply)
>>>
>>> Thus deterministically, key "ca:fe:ba:be" would always start based on an
>>> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
>>> based on an initial offset of say 00:02 UTC
>>>
>>> Is this possible? Or do I just have to find some way of queuing up my
>>> writes using back-pressure?
>>>
>>> Thanks in advance
>>>
>>> -stephenc
>>>
>>> P.S. I can trade assistance with Flink for assistance with Maven or
>>> Jenkins if my questions are too wierysome!
>>>
>>

Re: Is there a windowing strategy that allows a different offset per key?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Stephen,

First of all, yes, windows computing and emitting at the same time can
cause pressure on the downstream system.

There are a few ways how you can achieve this:
* use a custom window assigner. A window assigner decides into which window
a record is assigned. This is the approach you suggested.
* use a regular window and add an operator that buffers the window results
and releases them with randomized delay.
* use a ProcessFunction which allows you to control the timing of
computations yourself.

A few months ago, there was a similar discussion on the dev mailing list
[1] (didn't read the thread) started by Rong (in CC).
Maybe, he can share some ideas / experiences as well.

Cheers,
Fabian

[1]
https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E


Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
stephen.alan.connolly@gmail.com>:

> Looking into the code in TumblingEventTimeWindows:
>
> @Override
> public Collection<TimeWindow> assignWindows(Object element, long
> timestamp, WindowAssignerContext context) {
> if (timestamp > Long.MIN_VALUE) {
> // Long.MIN_VALUE is currently assigned when no timestamp is present
> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
> return Collections.singletonList(new TimeWindow(start, start + size));
> } else {
> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). " +
> "Is the time characteristic set to 'ProcessingTime', or did you forget to
> call " +
> "'DataStream.assignTimestampsAndWatermarks(...)'?");
> }
> }
>
> So I think I can just write my own where the offset is derived from
> hashing the element using my hash function.
>
> Good plan or bad plan?
>
>
> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> I would like to process a stream of data firom different customers,
>> producing output say once every 15 minutes. The results will then be loaded
>> into another system for stoage and querying.
>>
>> I have been using TumblingEventTimeWindows in my prototype, but I am
>> concerned that all the windows will start and stop at the same time and
>> cause batch load effects on the back-end data store.
>>
>> What I think I would like is that the windows could have a different
>> start offset for each key, (using a hash function that I would supply)
>>
>> Thus deterministically, key "ca:fe:ba:be" would always start based on an
>> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
>> based on an initial offset of say 00:02 UTC
>>
>> Is this possible? Or do I just have to find some way of queuing up my
>> writes using back-pressure?
>>
>> Thanks in advance
>>
>> -stephenc
>>
>> P.S. I can trade assistance with Flink for assistance with Maven or
>> Jenkins if my questions are too wierysome!
>>
>

Re: Is there a windowing strategy that allows a different offset per key?

Posted by Stephen Connolly <st...@gmail.com>.
Looking into the code in TumblingEventTimeWindows:

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp,
WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to
call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

So I think I can just write my own where the offset is derived from hashing
the element using my hash function.

Good plan or bad plan?


On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> I would like to process a stream of data firom different customers,
> producing output say once every 15 minutes. The results will then be loaded
> into another system for stoage and querying.
>
> I have been using TumblingEventTimeWindows in my prototype, but I am
> concerned that all the windows will start and stop at the same time and
> cause batch load effects on the back-end data store.
>
> What I think I would like is that the windows could have a different start
> offset for each key, (using a hash function that I would supply)
>
> Thus deterministically, key "ca:fe:ba:be" would always start based on an
> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
> based on an initial offset of say 00:02 UTC
>
> Is this possible? Or do I just have to find some way of queuing up my
> writes using back-pressure?
>
> Thanks in advance
>
> -stephenc
>
> P.S. I can trade assistance with Flink for assistance with Maven or
> Jenkins if my questions are too wierysome!
>