You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bowen Li <bo...@offerupnow.com> on 2017/08/16 04:37:33 UTC

Which window function to use to start a window at anytime

Hi guys,

We are trying use Flink to count millions of keyed items of an hour window
hourly as `time(SlidingEventTimeWindows.of(1hour, 1hour))`. According to
the sliding window doc
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#sliding-windows>,
all windows are aligned with epoch and always start at integral hours like
1:00:00.000 - 1:59:59.999.

But we actually want to start an hour window whenever an new item arrives.
For example,
- for item A, the first event arrives at 1:12:24.123, so the window would
be 1:12:24.123 - 2:12:24.122, and the next window would be 2:12:24.123 -
3:12:24.122, and so on
- for item B, the first event arrives at 1:10:20:321, so the window would
be 1:10:20:321 - 2:10:20:320, and the next window would be 2:10:20:321 -
3:10:20:320, and so on.

Do you have any insights on how to achieve it? Thanks!

Bowen

Re: Which window function to use to start a window at anytime

Posted by Bowen Li <bo...@offerupnow.com>.
Hi Aljoscha,
Thank you very much!

We imagined it's going to be very expensive to achieve that, and your
answer verified our understanding of how Flink works.

Regards,
Bowen



On Fri, Aug 25, 2017 at 8:18 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I'm afraid this is not possible right now because it would require keeping
> state in the WindowAssigner (per key) about what the start timestamp for a
> specific key is.
>
> I think you could emulate that behaviour by having a stateful FlatMap that
> keeps track of all keys and their respective timestamp and assigns windows
> based on that. For this, you would emit a custom data type that has the
> original data along with the assigned window. This window would then be
> "extracted" in the WindowAssigner. The downside of this is that you will
> have a lot of state so you would need a way to clean that up. You could do
> this by using a ProcessFunction where you set a cleanup timer for the
> per-key window-start state.
>
> Best,
> Aljoscha
>
> On 16. Aug 2017, at 06:37, Bowen Li <bo...@offerupnow.com> wrote:
>
> Hi guys,
>
> We are trying use Flink to count millions of keyed items of an hour window
> hourly as `time(SlidingEventTimeWindows.of(1hour, 1hour))`. According to
> the sliding window doc
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#sliding-windows>,
> all windows are aligned with epoch and always start at integral hours like
> 1:00:00.000 - 1:59:59.999.
>
> But we actually want to start an hour window whenever an new item arrives.
> For example,
> - for item A, the first event arrives at 1:12:24.123, so the window would
> be 1:12:24.123 - 2:12:24.122, and the next window would be 2:12:24.123 -
> 3:12:24.122, and so on
> - for item B, the first event arrives at 1:10:20:321, so the window would
> be 1:10:20:321 - 2:10:20:320, and the next window would be 2:10:20:321 -
> 3:10:20:320, and so on.
>
> Do you have any insights on how to achieve it? Thanks!
>
> Bowen
>
>
>

Re: Which window function to use to start a window at anytime

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I'm afraid this is not possible right now because it would require keeping state in the WindowAssigner (per key) about what the start timestamp for a specific key is.

I think you could emulate that behaviour by having a stateful FlatMap that keeps track of all keys and their respective timestamp and assigns windows based on that. For this, you would emit a custom data type that has the original data along with the assigned window. This window would then be "extracted" in the WindowAssigner. The downside of this is that you will have a lot of state so you would need a way to clean that up. You could do this by using a ProcessFunction where you set a cleanup timer for the per-key window-start state.

Best,
Aljoscha

> On 16. Aug 2017, at 06:37, Bowen Li <bo...@offerupnow.com> wrote:
> 
> Hi guys,
> 
> We are trying use Flink to count millions of keyed items of an hour window hourly as `time(SlidingEventTimeWindows.of(1hour, 1hour))`. According to the sliding window doc <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#sliding-windows>, all windows are aligned with epoch and always start at integral hours like 1:00:00.000 - 1:59:59.999.
> 
> But we actually want to start an hour window whenever an new item arrives. For example, 
> - for item A, the first event arrives at 1:12:24.123, so the window would be 1:12:24.123 - 2:12:24.122, and the next window would be 2:12:24.123 - 3:12:24.122, and so on
> - for item B, the first event arrives at 1:10:20:321, so the window would be 1:10:20:321 - 2:10:20:320, and the next window would be 2:10:20:321 - 3:10:20:320, and so on.
> 
> Do you have any insights on how to achieve it? Thanks!
> 
> Bowen
>