You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2018/02/01 04:19:13 UTC

Re: Can Window PTransform drop tuples that violate allowed lateness?

On Mon, Jan 22, 2018 at 11:42 AM, Shen Li <cs...@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for the explanation.
>
> > So now elements are droppable if they belong to an expired window.
>
> Say I have two consecutive window transforms with FixedWindows WindowFn
> (just an example, most likely won't appear in real pipeline). The first
> windowFn says the element belongs to an expired window. But according to
> the second windowFn, the element's window is not yet expired. In this case,
> can the first Window transform drop the element?
>

Yes, it is permitted to drop the expired data at any point. The reason I
think this is OK is that the runner also completely controls the watermark.
So there is arbitrary runner-owned behavior in terms of dropping either
way. It hasn't come up, since windows are hardly useful until you have an
aggregation, where they provide the notion of completeness. Do you have an
example in mind where this gets weird?

Kenn




> Best,
> Shen
>
> On Mon, Jan 22, 2018 at 2:07 PM, Kenneth Knowles <kl...@google.com> wrote:
>
>> Hi Shen,
>>
>> This is a documentation issue. The Beam model switched from dropping
>> individual elements to expiring windows. So now elements are droppable if
>> they belong to an expired window. This works a little better with the
>> purpose of windowing and allowed lateness: to say when an aggregation is
>> "complete". Any element that manages to make it to an aggregation before
>> the accumulator is expired is allowed to be included now and only after the
>> whole window expires we drop any further incoming elements for that window.
>>
>> Kenn
>>
>> On Mon, Jan 22, 2018 at 10:52 AM, Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The Window#withAllowedLateness(Duration) doc says "Any elements that
>>> are later than this as decided by the system-maintained watermark will be
>>> dropped". Can the runner safely discard a tuple that violates the allowed
>>> lateness in the Window operator? Or does it have to drop it in the
>>> downstream GBK operator just in case that there could be another Window
>>> transform in between overriding the allowed lateness (or other
>>> configurations)?
>>>
>>> Thanks,
>>> Shen
>>>
>>
>>
>

Re: Can Window PTransform drop tuples that violate allowed lateness?

Posted by Shen Li <cs...@gmail.com>.
Hi Kenn,

Thanks for the response.

Semantically, it is not causing any problem. I just want to make sure if
certain optimizations are valid. We want to drop an expired key and its
associated states when the highest boundary of all its windows falls behind
(watermark - allowedLateness). In the beginning, I am concerned about
states of the expired key could become in-time again when a downstream
Window transform re-assigns windows, which prevents non-aggregating
operators to discard any states. It could become costly when the
application sees an evolving set of keys.

Best,
Shen

On Wed, Jan 31, 2018 at 11:19 PM, Kenneth Knowles <kl...@google.com> wrote:

> On Mon, Jan 22, 2018 at 11:42 AM, Shen Li <cs...@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for the explanation.
>>
>> > So now elements are droppable if they belong to an expired window.
>>
>> Say I have two consecutive window transforms with FixedWindows WindowFn
>> (just an example, most likely won't appear in real pipeline). The first
>> windowFn says the element belongs to an expired window. But according to
>> the second windowFn, the element's window is not yet expired. In this case,
>> can the first Window transform drop the element?
>>
>
> Yes, it is permitted to drop the expired data at any point. The reason I
> think this is OK is that the runner also completely controls the watermark.
> So there is arbitrary runner-owned behavior in terms of dropping either
> way. It hasn't come up, since windows are hardly useful until you have an
> aggregation, where they provide the notion of completeness. Do you have an
> example in mind where this gets weird?
>
> Kenn
>
>
>
>
>> Best,
>> Shen
>>
>> On Mon, Jan 22, 2018 at 2:07 PM, Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Hi Shen,
>>>
>>> This is a documentation issue. The Beam model switched from dropping
>>> individual elements to expiring windows. So now elements are droppable if
>>> they belong to an expired window. This works a little better with the
>>> purpose of windowing and allowed lateness: to say when an aggregation is
>>> "complete". Any element that manages to make it to an aggregation before
>>> the accumulator is expired is allowed to be included now and only after the
>>> whole window expires we drop any further incoming elements for that window.
>>>
>>> Kenn
>>>
>>> On Mon, Jan 22, 2018 at 10:52 AM, Shen Li <cs...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> The Window#withAllowedLateness(Duration) doc says "Any elements that
>>>> are later than this as decided by the system-maintained watermark will be
>>>> dropped". Can the runner safely discard a tuple that violates the allowed
>>>> lateness in the Window operator? Or does it have to drop it in the
>>>> downstream GBK operator just in case that there could be another Window
>>>> transform in between overriding the allowed lateness (or other
>>>> configurations)?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>
>>>
>>
>