You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2016/08/01 13:31:59 UTC

Re: how does flink assign windows to task

Yes you're right Sameer. That's how things work in Flink.

Cheers,
Till

On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar <sa...@axiomine.com> wrote:

> Vishnu,
>
> I would imagine based on Max's explanation and how other systems like
> MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2
> keys would be assigned to each slot. Each slot would maintain one or more
> windows (more for time based windows) and each window would have upto 2
> panes (depending on whether there are elements for a key for a given
> window). The trigger would evaluate which of these panes will fire for
> global window (count windows) or which window as a whole fires for a time
> window.
>
> It seems like this is the only way to get the most efficient utilization
> for the entire cluster and allow all keys to be evaluated simultaneously
> without being starved by keys getting more elements in case of a slew.
>
> So I think you will need to have enough memory to hold all the elements
> that can arrive for all the active windows (not triggered) for two keys in
> a task. For count windows this is easy to estimate. But for times windows
> it is less clear if you receive elements out of order.
>
> Let's see what Max replies. I am just reasoning based on how Flink should
> work based on how other similar systems do it.
>
> Sameer
>
>
> On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath <
> vishnu.viswanath25@gmail.com> wrote:
>
> Hi Max,
>
> Thanks for the explanation.
>
> "This happens one after another in a single task slot but in parallel
> across all the task slots".
> Could you explain more on how this happens in parallel? Which part does
> occur in parallel? Is it the Trigger going through each pane and the window
> function being executed.
> As in the first example, if there are 100 Panes (since I have 1 window and
> 100 keys) will trigger go through these 100 Panes using 50 task slots and
> then execute whichever fires?  Does that mean that Flink determines which
> are the set of Panes that has to be evaluated in each task slot and then
> the trigger goes through it?
>
> The reason I am trying to understand exactly how it works is because : I
> need to decide how much memory each node in my cluster should have. I know
> that a single pane would not cause OOM in my case(since the number of
> elements per pane is not huge), but nodes might not have enough memory to
> hold the entire window in memory (since I can have a large number of Panes).
>
> Thanks,
> Vishnu
>
>
> On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Hi Vishnu Viswanath,
>>
>> The keyed elements are spread across the 50 task slots (assuming you
>> have a parallelism of 50) using hash partitioning on the keys. Each
>> task slot runs one or multiple operators (depending on the slot
>> sharing options). One of them is a WindowOperator which will decide
>> when to trigger and process your keyed elements.
>>
>> The WindowOperator holds the WindowAssigner and the Trigger. The
>> WindowAssigner will determine which window an incoming element gets
>> assigned. Windows are kept for each key; the combination of window and
>> key is usually called Pane. The Trigger will go through all the Panes
>> and check if they should fire or not (whether the window function
>> should be executed). This happens one after another in a single task
>> slot but in parallel across all the task slots.
>>
>> Just a brief explanation. Hope it helps :)
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
>> <vi...@gmail.com> wrote:
>> > Hi,
>> >
>> > Lets say I have a window on a keyed stream, and I have about 100 unique
>> > keys.
>> > And assume I have about 50 tasks slots in my cluster. And suppose my
>> trigger
>> > fired 70/100 windows/pane at the same time.
>> >
>> > How will flink handle this? Will it assign 50/70 triggered windows to
>> the 50
>> > available task slots and wait for 20 of them to finish before assigning
>> the
>> > remaining 20 to the slots?
>> >
>> > Thanks,
>> > Vishnu Viswanath
>>
>
>

Re: how does flink assign windows to task

Posted by Vishnu Viswanath <vi...@gmail.com>.
Thanks Sameer and Till,


On Mon, Aug 1, 2016 at 9:31 AM, Till Rohrmann <tr...@apache.org> wrote:

> Yes you're right Sameer. That's how things work in Flink.
>
> Cheers,
> Till
>
> On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar <sa...@axiomine.com>
> wrote:
>
>> Vishnu,
>>
>> I would imagine based on Max's explanation and how other systems like
>> MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2
>> keys would be assigned to each slot. Each slot would maintain one or more
>> windows (more for time based windows) and each window would have upto 2
>> panes (depending on whether there are elements for a key for a given
>> window). The trigger would evaluate which of these panes will fire for
>> global window (count windows) or which window as a whole fires for a time
>> window.
>>
>> It seems like this is the only way to get the most efficient utilization
>> for the entire cluster and allow all keys to be evaluated simultaneously
>> without being starved by keys getting more elements in case of a slew.
>>
>> So I think you will need to have enough memory to hold all the elements
>> that can arrive for all the active windows (not triggered) for two keys in
>> a task. For count windows this is easy to estimate. But for times windows
>> it is less clear if you receive elements out of order.
>>
>> Let's see what Max replies. I am just reasoning based on how Flink should
>> work based on how other similar systems do it.
>>
>> Sameer
>>
>>
>> On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath <
>> vishnu.viswanath25@gmail.com> wrote:
>>
>> Hi Max,
>>
>> Thanks for the explanation.
>>
>> "This happens one after another in a single task slot but in parallel
>> across all the task slots".
>> Could you explain more on how this happens in parallel? Which part does
>> occur in parallel? Is it the Trigger going through each pane and the window
>> function being executed.
>> As in the first example, if there are 100 Panes (since I have 1 window
>> and 100 keys) will trigger go through these 100 Panes using 50 task slots
>> and then execute whichever fires?  Does that mean that Flink determines
>> which are the set of Panes that has to be evaluated in each task slot and
>> then the trigger goes through it?
>>
>> The reason I am trying to understand exactly how it works is because : I
>> need to decide how much memory each node in my cluster should have. I know
>> that a single pane would not cause OOM in my case(since the number of
>> elements per pane is not huge), but nodes might not have enough memory to
>> hold the entire window in memory (since I can have a large number of Panes).
>>
>> Thanks,
>> Vishnu
>>
>>
>> On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Vishnu Viswanath,
>>>
>>> The keyed elements are spread across the 50 task slots (assuming you
>>> have a parallelism of 50) using hash partitioning on the keys. Each
>>> task slot runs one or multiple operators (depending on the slot
>>> sharing options). One of them is a WindowOperator which will decide
>>> when to trigger and process your keyed elements.
>>>
>>> The WindowOperator holds the WindowAssigner and the Trigger. The
>>> WindowAssigner will determine which window an incoming element gets
>>> assigned. Windows are kept for each key; the combination of window and
>>> key is usually called Pane. The Trigger will go through all the Panes
>>> and check if they should fire or not (whether the window function
>>> should be executed). This happens one after another in a single task
>>> slot but in parallel across all the task slots.
>>>
>>> Just a brief explanation. Hope it helps :)
>>>
>>> Cheers,
>>> Max
>>>
>>> On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
>>> <vi...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > Lets say I have a window on a keyed stream, and I have about 100 unique
>>> > keys.
>>> > And assume I have about 50 tasks slots in my cluster. And suppose my
>>> trigger
>>> > fired 70/100 windows/pane at the same time.
>>> >
>>> > How will flink handle this? Will it assign 50/70 triggered windows to
>>> the 50
>>> > available task slots and wait for 20 of them to finish before
>>> assigning the
>>> > remaining 20 to the slots?
>>> >
>>> > Thanks,
>>> > Vishnu Viswanath
>>>
>>
>>
>