You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shen Li <cs...@gmail.com> on 2018/03/12 18:55:15 UTC

Flatten input data streams with skewed watermark progress

If multiple inputs of Flatten proceed at different speeds, should the
Flatten operator cache tuples before emitting output watermarks? This can
prevent a late tuple from becoming early. But if the watermark gap (i.e.,
cache size) becomes too large among inputs, can the application tell
Beam/runner to emit output watermark anyway and consider slow input tuples
as late?

Thanks,
Shen

Re: Flatten input data streams with skewed watermark progress

Posted by Shen Li <cs...@gmail.com>.
Thomas and Reuven,

Thank you for the explanation.

Shen

On Mon, Mar 12, 2018 at 7:05 PM, Thomas Groh <tg...@google.com> wrote:

> That one would be, for example, having a PCollection with a highly
> advanced watermark and a PCollection with a much earlier watermark, and
> have an input that is behind the watermark of the former PCollection go
> through the flatten - at which point it moves to being ahead of the
> watermark.
>
> That's fine, because one of two things happens in practice:
> * Either the upstream contains a GroupByKey, in which the element will be
> dropped if the window is expired
> * Or, the upstream does not contain a GroupByKey, which means the element
> never appeared at such a grouping behind the watermark, its final window
> was never expired before that element arrived at the first downstream
> GroupByKey.
>
> Specifically we're concerned about GroupByKeys because that's the point at
> which we become certain of the window the element is within, and if that
> window is expired; before that point, we can't claim with certainty on the
> final window the element will be assigned to.
>
>
> On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax <re...@google.com> wrote:
>
>> Logically a Flatten is just a way to create a multi-input transform
>> downstream of the flatten (you can imagine a model in which Flatten was not
>> explicit, we just allowed multiple main inputs). This means that yes, the
>> watermark is the minimum of all inputs.
>>
>> I don't see how a late tuple can become early. Can you explain?
>>
>>
>> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> What about watermark? Should Flatten emit the min watermark of all input
>>> data streams? If that is the case, one late tuple can become early after
>>> Flatten, right? Will that cause any problem?
>>>
>>> Shen
>>>
>>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>>
>>>> No, I don't think it makes sense for the Flatten operator to cache
>>>> element.
>>>>
>>>>
>>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>>>>
>>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>>> cache size) becomes too large among inputs, can the application tell
>>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>>> as late?
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>
>>>

Re: Flatten input data streams with skewed watermark progress

Posted by Thomas Groh <tg...@google.com>.
That one would be, for example, having a PCollection with a highly advanced
watermark and a PCollection with a much earlier watermark, and have an
input that is behind the watermark of the former PCollection go through the
flatten - at which point it moves to being ahead of the watermark.

That's fine, because one of two things happens in practice:
* Either the upstream contains a GroupByKey, in which the element will be
dropped if the window is expired
* Or, the upstream does not contain a GroupByKey, which means the element
never appeared at such a grouping behind the watermark, its final window
was never expired before that element arrived at the first downstream
GroupByKey.

Specifically we're concerned about GroupByKeys because that's the point at
which we become certain of the window the element is within, and if that
window is expired; before that point, we can't claim with certainty on the
final window the element will be assigned to.


On Mon, Mar 12, 2018 at 3:47 PM Reuven Lax <re...@google.com> wrote:

> Logically a Flatten is just a way to create a multi-input transform
> downstream of the flatten (you can imagine a model in which Flatten was not
> explicit, we just allowed multiple main inputs). This means that yes, the
> watermark is the minimum of all inputs.
>
> I don't see how a late tuple can become early. Can you explain?
>
>
> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> What about watermark? Should Flatten emit the min watermark of all input
>> data streams? If that is the case, one late tuple can become early after
>> Flatten, right? Will that cause any problem?
>>
>> Shen
>>
>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>
>>> No, I don't think it makes sense for the Flatten operator to cache
>>> element.
>>>
>>>
>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>>>
>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>> cache size) becomes too large among inputs, can the application tell
>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>> as late?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>
>>

Re: Flatten input data streams with skewed watermark progress

Posted by Reuven Lax <re...@google.com>.
Ah, that's fine I think. What's not fine is .for an on-time element to
later turn into a late element.


On Mon, Mar 12, 2018 at 4:05 PM Shen Li <cs...@gmail.com> wrote:

> Sure. Consider the following case, where I have two input streams A and B.
> (ts = timestamp, wm = watermark)
>
> processing time            stream A                      stream B
>
> 0                                   elem=x, ts=1                 wm=3
> 1                                   wm=1                        elem=y,
> ts=2
> 2                                   wm=2                        elem=z,
> ts=4
>
> In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind
> the watermark (3).
>
> However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y,
> ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late
> due to the slower watermark progress in stream A.
>
> Is the above a valid scenario?
>
> Shen
>
>
> On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote:
>
>> Logically a Flatten is just a way to create a multi-input transform
>> downstream of the flatten (you can imagine a model in which Flatten was not
>> explicit, we just allowed multiple main inputs). This means that yes, the
>> watermark is the minimum of all inputs.
>>
>> I don't see how a late tuple can become early. Can you explain?
>>
>>
>> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi Reuven,
>>>
>>> What about watermark? Should Flatten emit the min watermark of all input
>>> data streams? If that is the case, one late tuple can become early after
>>> Flatten, right? Will that cause any problem?
>>>
>>> Shen
>>>
>>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>>
>>>> No, I don't think it makes sense for the Flatten operator to cache
>>>> element.
>>>>
>>>>
>>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>>>>
>>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>>> cache size) becomes too large among inputs, can the application tell
>>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>>> as late?
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>
>>>
>

Re: Flatten input data streams with skewed watermark progress

Posted by Shen Li <cs...@gmail.com>.
Sure. Consider the following case, where I have two input streams A and B.
(ts = timestamp, wm = watermark)

processing time            stream A                      stream B

0                                   elem=x, ts=1                 wm=3
1                                   wm=1                        elem=y, ts=2
2                                   wm=2                        elem=z, ts=4

In stream B, {elem=y, ts=2} is late, as it's timestamp (2) falls behind the
watermark (3).

However, the Flatten output would be: {elem=x, ts=1}, {wm=1}, {elem=y,
ts=2} {wm=2}, {elem=z, ts=4}, where the {elem=y, ts=2} is no longer late
due to the slower watermark progress in stream A.

Is the above a valid scenario?

Shen


On Mon, Mar 12, 2018 at 6:46 PM, Reuven Lax <re...@google.com> wrote:

> Logically a Flatten is just a way to create a multi-input transform
> downstream of the flatten (you can imagine a model in which Flatten was not
> explicit, we just allowed multiple main inputs). This means that yes, the
> watermark is the minimum of all inputs.
>
> I don't see how a late tuple can become early. Can you explain?
>
>
> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> What about watermark? Should Flatten emit the min watermark of all input
>> data streams? If that is the case, one late tuple can become early after
>> Flatten, right? Will that cause any problem?
>>
>> Shen
>>
>> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>>
>>> No, I don't think it makes sense for the Flatten operator to cache
>>> element.
>>>
>>>
>>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>>>
>>>> If multiple inputs of Flatten proceed at different speeds, should the
>>>> Flatten operator cache tuples before emitting output watermarks? This can
>>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>>> cache size) becomes too large among inputs, can the application tell
>>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>>> as late?
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>
>>

Re: Flatten input data streams with skewed watermark progress

Posted by Reuven Lax <re...@google.com>.
Logically a Flatten is just a way to create a multi-input transform
downstream of the flatten (you can imagine a model in which Flatten was not
explicit, we just allowed multiple main inputs). This means that yes, the
watermark is the minimum of all inputs.

I don't see how a late tuple can become early. Can you explain?


On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs...@gmail.com> wrote:

> Hi Reuven,
>
> What about watermark? Should Flatten emit the min watermark of all input
> data streams? If that is the case, one late tuple can become early after
> Flatten, right? Will that cause any problem?
>
> Shen
>
> On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:
>
>> No, I don't think it makes sense for the Flatten operator to cache
>> element.
>>
>>
>> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>>
>>> If multiple inputs of Flatten proceed at different speeds, should the
>>> Flatten operator cache tuples before emitting output watermarks? This can
>>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>>> cache size) becomes too large among inputs, can the application tell
>>> Beam/runner to emit output watermark anyway and consider slow input tuples
>>> as late?
>>>
>>> Thanks,
>>> Shen
>>>
>>
>

Re: Flatten input data streams with skewed watermark progress

Posted by Shen Li <cs...@gmail.com>.
Hi Reuven,

What about watermark? Should Flatten emit the min watermark of all input
data streams? If that is the case, one late tuple can become early after
Flatten, right? Will that cause any problem?

Shen

On Mon, Mar 12, 2018 at 4:09 PM, Reuven Lax <re...@google.com> wrote:

> No, I don't think it makes sense for the Flatten operator to cache
> element.
>
>
> On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:
>
>> If multiple inputs of Flatten proceed at different speeds, should the
>> Flatten operator cache tuples before emitting output watermarks? This can
>> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
>> cache size) becomes too large among inputs, can the application tell
>> Beam/runner to emit output watermark anyway and consider slow input tuples
>> as late?
>>
>> Thanks,
>> Shen
>>
>

Re: Flatten input data streams with skewed watermark progress

Posted by Reuven Lax <re...@google.com>.
No, I don't think it makes sense for the Flatten operator to cache element.


On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs...@gmail.com> wrote:

> If multiple inputs of Flatten proceed at different speeds, should the
> Flatten operator cache tuples before emitting output watermarks? This can
> prevent a late tuple from becoming early. But if the watermark gap (i.e.,
> cache size) becomes too large among inputs, can the application tell
> Beam/runner to emit output watermark anyway and consider slow input tuples
> as late?
>
> Thanks,
> Shen
>