You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2019/04/11 16:27:23 UTC

[DISCUSS] Side input consistency guarantees for triggers with multiple firings

Today, we define that a side input becomes available to be consumed once at
least one firing occurs or when the runner detects that no such output
could be produced (e.g. watermark is beyond the end of the window when
using the default trigger). For triggers that fire at most once, consumers
are guaranteed to have a consistent view of the contents of the side input.
But what happens when the trigger fire multiple times?

Lets say we have a pipeline containing:
ParDo(A) --> PCollectionView S
         \-> PCollectionView T

  ...
   |
ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
   |
  ...

1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
can also see Y (and vice versa)?

2) Lets say ParDo(A) outputs (during a single bundle) X to PCollectionView
S and Y to PCollectionView T, should ParDo(C) be guaranteed to see X only
if it can also see Y?

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Lukasz Cwik <lc...@google.com>.
I understand that firing multiple times per window tend to be
non-deterministic but here is an example use case. A pipeline reads pubsub
messages which contain account linking messages. Each message has two
accounts that are linked together so the user produces a KV<AccountA,
AccountB> and KV<AccountB, AccountA> and outputs them to the multimap
PCollectionView. Then some other portion of the pipeline consumes account
update messages from a different pubsub topic and makes sure that updates
are applied to all linked accounts. Could the author of the pipeline know
that the multimap will contain a consistent view of these bidirectional
mappings?

On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:

> One thing to keep in mind: triggers that fire multiple times per window
> already tend to be non deterministic. These are element-count or
> processing-time triggers, both of which are fairly non deterministic in
> firing.
>
> Reuven
>
> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Today, we define that a side input becomes available to be consumed once
>> at least one firing occurs or when the runner detects that no such output
>> could be produced (e.g. watermark is beyond the end of the window when
>> using the default trigger). For triggers that fire at most once, consumers
>> are guaranteed to have a consistent view of the contents of the side input.
>> But what happens when the trigger fire multiple times?
>>
>> Lets say we have a pipeline containing:
>> ParDo(A) --> PCollectionView S
>>          \-> PCollectionView T
>>
>>   ...
>>    |
>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>    |
>>   ...
>>
>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>> can also see Y (and vice versa)?
>>
>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>> to see X only if it can also see Y?
>>
>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Kenneth Knowles <ke...@apache.org>.
The thing I dislike about this all is that a main value that Beam (&
similar) bring to users is removing the concerns of classical concurrent
programming. But Luke's example is convincing that we might need to have a
discussion around a causality-based consistency model.

On Fri, Apr 12, 2019 at 9:58 AM Lukasz Cwik <lc...@google.com> wrote:

> Yes, if we had such a pipeline:
> ParDo(A) --> PCollectionView S
>
>   ...
>    |
> ParDo(C) <-(side input)- PCollectionView S
>    |
>   ...
>    |
> ParDo(D) <-(side input)- PCollectionView S
>    |
>   ...
>
> We could reason that ParDo(D) should see at least the same or newer
> contents of PCollectionView S then when ParDo(C) saw it.
>

I think it is easy to talk about happens-before using only data provenance
here. If ParDo(D) sees an element that is caused by some element `y` in S,
then ParDo(D) is effectively observing that `y` is in S indirectly, so it
must also be observed directly.

The case of meta-keys is harder, because there is not automatically a data
provenance argument. You could perhaps make a pipeline that forces it by
computing the meta-key downstream from the other outputs. Therefore
observing the meta key implies indirectly observing the outputs. But you
cannot make the data dependency go both directions, I think, and also this
is not the most efficient way to implement.

Kenn



>
> On Thu, Apr 11, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>
>> BTW another issue is when a single triggered PCollectionView is read by
>> two different ParDos - each one might have a different view of the trigger.
>> This is noticeable if the output of those two ParDos is then joined
>> together.
>>
>> Reuven
>>
>> On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> The consistency problem occurs even in a single output PCollection that
>>> is read as a side input, because two output elements can be re-bundled and
>>> materialized in separate updates to the side input.
>>>
>>> Kenn
>>>
>>> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ru...@google.com> wrote:
>>>
>>>> With little to none experience on Trigger, I am trying to understand
>>>> the problem statement in this discussion.
>>>>
>>>> If a user is aware of the potential non-deterministic behavior, isn't
>>>> it almost trivial to refactor his/her user code, by putting
>>>> PCollectionViews S and T into one single PCollectionView S', to get around
>>>> the issue?     I cannot think of a reason (wrong?) why a user *have* to put
>>>> data into two separate PCollectionViews in a single ParDo(A).
>>>>
>>>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Even though what Kenn points out is a major reason for me bringing up
>>>>> this topic, I didn't want to limit this discussion to how side inputs could
>>>>> work but in general what users want from their side inputs when dealing
>>>>> with multiple firings.
>>>>>
>>>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Luke & I talked in person a bit. I want to give context for what is
>>>>>> at stake here, in terms of side inputs in portability. A decent starting
>>>>>> place is https://s.apache.org/beam-side-inputs-1-pager
>>>>>>
>>>>>> In that general design, the runner offers the SDK just one (or a few)
>>>>>> materialization strategies, and the SDK builds idiomatic structures on top
>>>>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>>>>>> runner to materialize. As a naive example, a simple iterable structure
>>>>>> could just map all elements to one dummy key in the multimap. But if you
>>>>>> wanted a list plus its length, then you might map all elements to an
>>>>>> element key and the length to a special length meta-key.
>>>>>>
>>>>>> So there is a problem: if the SDK is outputting a new
>>>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to
>>>>>> materialize then consumers of the side input need to see both updates to
>>>>>> the materialization or neither. In general, these outputs might span many
>>>>>> keys.
>>>>>>
>>>>>> It seems like there are a few ways to resolve this tension:
>>>>>>
>>>>>>  - Establish a consistency model so these updates will be observed
>>>>>> together. Seems hard and whatever we come up with will limit runners, limit
>>>>>> efficiency, and potentially leak into users having to reason about
>>>>>> concurrency
>>>>>>
>>>>>>  - Instead of building the variety of side input views on one
>>>>>> primitive multimap materialization, force runners to provide many primitive
>>>>>> materializations with consistency under the hood. Not hard to get started,
>>>>>> but adds an unfortunate new dimension for runners to vary in functionality
>>>>>> and performance, versus letting them optimize just one or a few
>>>>>> materializations
>>>>>>
>>>>>>  - Have no consistency and just not support side input methods that
>>>>>> would require consistent metadata. I'm curious what features this will hurt.
>>>>>>
>>>>>>  - Have no consistency but require the SDK to build some sort of
>>>>>> large value since single-element consistency is built in to the model
>>>>>> always. Today many runners do concatenate all elements into one value,
>>>>>> though that does not perform well. Making this effective probably requires
>>>>>> new model features.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> One thing to keep in mind: triggers that fire multiple times per
>>>>>>> window already tend to be non deterministic. These are element-count or
>>>>>>> processing-time triggers, both of which are fairly non deterministic in
>>>>>>> firing.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Today, we define that a side input becomes available to be consumed
>>>>>>>> once at least one firing occurs or when the runner detects that no such
>>>>>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>>>>>> when using the default trigger). For triggers that fire at most once,
>>>>>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>>>>
>>>>>>>> Lets say we have a pipeline containing:
>>>>>>>> ParDo(A) --> PCollectionView S
>>>>>>>>          \-> PCollectionView T
>>>>>>>>
>>>>>>>>   ...
>>>>>>>>    |
>>>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>>>>    |
>>>>>>>>   ...
>>>>>>>>
>>>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>>>>>> can also see Y (and vice versa)?
>>>>>>>>
>>>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>>>>>>> to see X only if it can also see Y?
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> ================
>>>> Ruoyun  Huang
>>>>
>>>>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Lukasz Cwik <lc...@google.com>.
Yes, if we had such a pipeline:
ParDo(A) --> PCollectionView S

  ...
   |
ParDo(C) <-(side input)- PCollectionView S
   |
  ...
   |
ParDo(D) <-(side input)- PCollectionView S
   |
  ...

We could reason that ParDo(D) should see at least the same or newer
contents of PCollectionView S then when ParDo(C) saw it.

On Thu, Apr 11, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:

> BTW another issue is when a single triggered PCollectionView is read by
> two different ParDos - each one might have a different view of the trigger.
> This is noticeable if the output of those two ParDos is then joined
> together.
>
> Reuven
>
> On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> The consistency problem occurs even in a single output PCollection that
>> is read as a side input, because two output elements can be re-bundled and
>> materialized in separate updates to the side input.
>>
>> Kenn
>>
>> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ru...@google.com> wrote:
>>
>>> With little to none experience on Trigger, I am trying to understand the
>>> problem statement in this discussion.
>>>
>>> If a user is aware of the potential non-deterministic behavior, isn't it
>>> almost trivial to refactor his/her user code, by putting PCollectionViews S
>>> and T into one single PCollectionView S', to get around the issue?     I
>>> cannot think of a reason (wrong?) why a user *have* to put data into two
>>> separate PCollectionViews in a single ParDo(A).
>>>
>>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Even though what Kenn points out is a major reason for me bringing up
>>>> this topic, I didn't want to limit this discussion to how side inputs could
>>>> work but in general what users want from their side inputs when dealing
>>>> with multiple firings.
>>>>
>>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Luke & I talked in person a bit. I want to give context for what is at
>>>>> stake here, in terms of side inputs in portability. A decent starting place
>>>>> is https://s.apache.org/beam-side-inputs-1-pager
>>>>>
>>>>> In that general design, the runner offers the SDK just one (or a few)
>>>>> materialization strategies, and the SDK builds idiomatic structures on top
>>>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>>>>> runner to materialize. As a naive example, a simple iterable structure
>>>>> could just map all elements to one dummy key in the multimap. But if you
>>>>> wanted a list plus its length, then you might map all elements to an
>>>>> element key and the length to a special length meta-key.
>>>>>
>>>>> So there is a problem: if the SDK is outputting a new
>>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to
>>>>> materialize then consumers of the side input need to see both updates to
>>>>> the materialization or neither. In general, these outputs might span many
>>>>> keys.
>>>>>
>>>>> It seems like there are a few ways to resolve this tension:
>>>>>
>>>>>  - Establish a consistency model so these updates will be observed
>>>>> together. Seems hard and whatever we come up with will limit runners, limit
>>>>> efficiency, and potentially leak into users having to reason about
>>>>> concurrency
>>>>>
>>>>>  - Instead of building the variety of side input views on one
>>>>> primitive multimap materialization, force runners to provide many primitive
>>>>> materializations with consistency under the hood. Not hard to get started,
>>>>> but adds an unfortunate new dimension for runners to vary in functionality
>>>>> and performance, versus letting them optimize just one or a few
>>>>> materializations
>>>>>
>>>>>  - Have no consistency and just not support side input methods that
>>>>> would require consistent metadata. I'm curious what features this will hurt.
>>>>>
>>>>>  - Have no consistency but require the SDK to build some sort of large
>>>>> value since single-element consistency is built in to the model always.
>>>>> Today many runners do concatenate all elements into one value, though that
>>>>> does not perform well. Making this effective probably requires new model
>>>>> features.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> One thing to keep in mind: triggers that fire multiple times per
>>>>>> window already tend to be non deterministic. These are element-count or
>>>>>> processing-time triggers, both of which are fairly non deterministic in
>>>>>> firing.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Today, we define that a side input becomes available to be consumed
>>>>>>> once at least one firing occurs or when the runner detects that no such
>>>>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>>>>> when using the default trigger). For triggers that fire at most once,
>>>>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>>>
>>>>>>> Lets say we have a pipeline containing:
>>>>>>> ParDo(A) --> PCollectionView S
>>>>>>>          \-> PCollectionView T
>>>>>>>
>>>>>>>   ...
>>>>>>>    |
>>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>>>    |
>>>>>>>   ...
>>>>>>>
>>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>>>>> can also see Y (and vice versa)?
>>>>>>>
>>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>>>>>> to see X only if it can also see Y?
>>>>>>>
>>>>>>
>>>
>>> --
>>> ================
>>> Ruoyun  Huang
>>>
>>>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Reuven Lax <re...@google.com>.
BTW another issue is when a single triggered PCollectionView is read by two
different ParDos - each one might have a different view of the trigger.
This is noticeable if the output of those two ParDos is then joined
together.

Reuven

On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <ke...@apache.org> wrote:

> The consistency problem occurs even in a single output PCollection that is
> read as a side input, because two output elements can be re-bundled and
> materialized in separate updates to the side input.
>
> Kenn
>
> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ru...@google.com> wrote:
>
>> With little to none experience on Trigger, I am trying to understand the
>> problem statement in this discussion.
>>
>> If a user is aware of the potential non-deterministic behavior, isn't it
>> almost trivial to refactor his/her user code, by putting PCollectionViews S
>> and T into one single PCollectionView S', to get around the issue?     I
>> cannot think of a reason (wrong?) why a user *have* to put data into two
>> separate PCollectionViews in a single ParDo(A).
>>
>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Even though what Kenn points out is a major reason for me bringing up
>>> this topic, I didn't want to limit this discussion to how side inputs could
>>> work but in general what users want from their side inputs when dealing
>>> with multiple firings.
>>>
>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> Luke & I talked in person a bit. I want to give context for what is at
>>>> stake here, in terms of side inputs in portability. A decent starting place
>>>> is https://s.apache.org/beam-side-inputs-1-pager
>>>>
>>>> In that general design, the runner offers the SDK just one (or a few)
>>>> materialization strategies, and the SDK builds idiomatic structures on top
>>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>>>> runner to materialize. As a naive example, a simple iterable structure
>>>> could just map all elements to one dummy key in the multimap. But if you
>>>> wanted a list plus its length, then you might map all elements to an
>>>> element key and the length to a special length meta-key.
>>>>
>>>> So there is a problem: if the SDK is outputting a new
>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to
>>>> materialize then consumers of the side input need to see both updates to
>>>> the materialization or neither. In general, these outputs might span many
>>>> keys.
>>>>
>>>> It seems like there are a few ways to resolve this tension:
>>>>
>>>>  - Establish a consistency model so these updates will be observed
>>>> together. Seems hard and whatever we come up with will limit runners, limit
>>>> efficiency, and potentially leak into users having to reason about
>>>> concurrency
>>>>
>>>>  - Instead of building the variety of side input views on one primitive
>>>> multimap materialization, force runners to provide many primitive
>>>> materializations with consistency under the hood. Not hard to get started,
>>>> but adds an unfortunate new dimension for runners to vary in functionality
>>>> and performance, versus letting them optimize just one or a few
>>>> materializations
>>>>
>>>>  - Have no consistency and just not support side input methods that
>>>> would require consistent metadata. I'm curious what features this will hurt.
>>>>
>>>>  - Have no consistency but require the SDK to build some sort of large
>>>> value since single-element consistency is built in to the model always.
>>>> Today many runners do concatenate all elements into one value, though that
>>>> does not perform well. Making this effective probably requires new model
>>>> features.
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> One thing to keep in mind: triggers that fire multiple times per
>>>>> window already tend to be non deterministic. These are element-count or
>>>>> processing-time triggers, both of which are fairly non deterministic in
>>>>> firing.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Today, we define that a side input becomes available to be consumed
>>>>>> once at least one firing occurs or when the runner detects that no such
>>>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>>>> when using the default trigger). For triggers that fire at most once,
>>>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>>
>>>>>> Lets say we have a pipeline containing:
>>>>>> ParDo(A) --> PCollectionView S
>>>>>>          \-> PCollectionView T
>>>>>>
>>>>>>   ...
>>>>>>    |
>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>>    |
>>>>>>   ...
>>>>>>
>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>>>> can also see Y (and vice versa)?
>>>>>>
>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>>>>> to see X only if it can also see Y?
>>>>>>
>>>>>
>>
>> --
>> ================
>> Ruoyun  Huang
>>
>>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Kenneth Knowles <ke...@apache.org>.
The consistency problem occurs even in a single output PCollection that is
read as a side input, because two output elements can be re-bundled and
materialized in separate updates to the side input.

Kenn

On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <ru...@google.com> wrote:

> With little to none experience on Trigger, I am trying to understand the
> problem statement in this discussion.
>
> If a user is aware of the potential non-deterministic behavior, isn't it
> almost trivial to refactor his/her user code, by putting PCollectionViews S
> and T into one single PCollectionView S', to get around the issue?     I
> cannot think of a reason (wrong?) why a user *have* to put data into two
> separate PCollectionViews in a single ParDo(A).
>
> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Even though what Kenn points out is a major reason for me bringing up
>> this topic, I didn't want to limit this discussion to how side inputs could
>> work but in general what users want from their side inputs when dealing
>> with multiple firings.
>>
>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Luke & I talked in person a bit. I want to give context for what is at
>>> stake here, in terms of side inputs in portability. A decent starting place
>>> is https://s.apache.org/beam-side-inputs-1-pager
>>>
>>> In that general design, the runner offers the SDK just one (or a few)
>>> materialization strategies, and the SDK builds idiomatic structures on top
>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>>> runner to materialize. As a naive example, a simple iterable structure
>>> could just map all elements to one dummy key in the multimap. But if you
>>> wanted a list plus its length, then you might map all elements to an
>>> element key and the length to a special length meta-key.
>>>
>>> So there is a problem: if the SDK is outputting a new KV<"elements-key",
>>> ...> and KV<"length-key", ...> for the runner to materialize then consumers
>>> of the side input need to see both updates to the materialization or
>>> neither. In general, these outputs might span many keys.
>>>
>>> It seems like there are a few ways to resolve this tension:
>>>
>>>  - Establish a consistency model so these updates will be observed
>>> together. Seems hard and whatever we come up with will limit runners, limit
>>> efficiency, and potentially leak into users having to reason about
>>> concurrency
>>>
>>>  - Instead of building the variety of side input views on one primitive
>>> multimap materialization, force runners to provide many primitive
>>> materializations with consistency under the hood. Not hard to get started,
>>> but adds an unfortunate new dimension for runners to vary in functionality
>>> and performance, versus letting them optimize just one or a few
>>> materializations
>>>
>>>  - Have no consistency and just not support side input methods that
>>> would require consistent metadata. I'm curious what features this will hurt.
>>>
>>>  - Have no consistency but require the SDK to build some sort of large
>>> value since single-element consistency is built in to the model always.
>>> Today many runners do concatenate all elements into one value, though that
>>> does not perform well. Making this effective probably requires new model
>>> features.
>>>
>>> Kenn
>>>
>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> One thing to keep in mind: triggers that fire multiple times per window
>>>> already tend to be non deterministic. These are element-count or
>>>> processing-time triggers, both of which are fairly non deterministic in
>>>> firing.
>>>>
>>>> Reuven
>>>>
>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Today, we define that a side input becomes available to be consumed
>>>>> once at least one firing occurs or when the runner detects that no such
>>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>>> when using the default trigger). For triggers that fire at most once,
>>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>
>>>>> Lets say we have a pipeline containing:
>>>>> ParDo(A) --> PCollectionView S
>>>>>          \-> PCollectionView T
>>>>>
>>>>>   ...
>>>>>    |
>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>    |
>>>>>   ...
>>>>>
>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>>> can also see Y (and vice versa)?
>>>>>
>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>>>> to see X only if it can also see Y?
>>>>>
>>>>
>
> --
> ================
> Ruoyun  Huang
>
>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Ruoyun Huang <ru...@google.com>.
With little to none experience on Trigger, I am trying to understand the
problem statement in this discussion.

If a user is aware of the potential non-deterministic behavior, isn't it
almost trivial to refactor his/her user code, by putting PCollectionViews S
and T into one single PCollectionView S', to get around the issue?     I
cannot think of a reason (wrong?) why a user *have* to put data into two
separate PCollectionViews in a single ParDo(A).

On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <lc...@google.com> wrote:

> Even though what Kenn points out is a major reason for me bringing up this
> topic, I didn't want to limit this discussion to how side inputs could work
> but in general what users want from their side inputs when dealing with
> multiple firings.
>
> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Luke & I talked in person a bit. I want to give context for what is at
>> stake here, in terms of side inputs in portability. A decent starting place
>> is https://s.apache.org/beam-side-inputs-1-pager
>>
>> In that general design, the runner offers the SDK just one (or a few)
>> materialization strategies, and the SDK builds idiomatic structures on top
>> of it. Concretely, the Fn API today offers a multimap structure, and the
>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>> runner to materialize. As a naive example, a simple iterable structure
>> could just map all elements to one dummy key in the multimap. But if you
>> wanted a list plus its length, then you might map all elements to an
>> element key and the length to a special length meta-key.
>>
>> So there is a problem: if the SDK is outputting a new KV<"elements-key",
>> ...> and KV<"length-key", ...> for the runner to materialize then consumers
>> of the side input need to see both updates to the materialization or
>> neither. In general, these outputs might span many keys.
>>
>> It seems like there are a few ways to resolve this tension:
>>
>>  - Establish a consistency model so these updates will be observed
>> together. Seems hard and whatever we come up with will limit runners, limit
>> efficiency, and potentially leak into users having to reason about
>> concurrency
>>
>>  - Instead of building the variety of side input views on one primitive
>> multimap materialization, force runners to provide many primitive
>> materializations with consistency under the hood. Not hard to get started,
>> but adds an unfortunate new dimension for runners to vary in functionality
>> and performance, versus letting them optimize just one or a few
>> materializations
>>
>>  - Have no consistency and just not support side input methods that would
>> require consistent metadata. I'm curious what features this will hurt.
>>
>>  - Have no consistency but require the SDK to build some sort of large
>> value since single-element consistency is built in to the model always.
>> Today many runners do concatenate all elements into one value, though that
>> does not perform well. Making this effective probably requires new model
>> features.
>>
>> Kenn
>>
>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>>
>>> One thing to keep in mind: triggers that fire multiple times per window
>>> already tend to be non deterministic. These are element-count or
>>> processing-time triggers, both of which are fairly non deterministic in
>>> firing.
>>>
>>> Reuven
>>>
>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Today, we define that a side input becomes available to be consumed
>>>> once at least one firing occurs or when the runner detects that no such
>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>> when using the default trigger). For triggers that fire at most once,
>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>> side input. But what happens when the trigger fire multiple times?
>>>>
>>>> Lets say we have a pipeline containing:
>>>> ParDo(A) --> PCollectionView S
>>>>          \-> PCollectionView T
>>>>
>>>>   ...
>>>>    |
>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>    |
>>>>   ...
>>>>
>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>> can also see Y (and vice versa)?
>>>>
>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>>> to see X only if it can also see Y?
>>>>
>>>

-- 
================
Ruoyun  Huang

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Lukasz Cwik <lc...@google.com>.
Even though what Kenn points out is a major reason for me bringing up this
topic, I didn't want to limit this discussion to how side inputs could work
but in general what users want from their side inputs when dealing with
multiple firings.

On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <ke...@apache.org> wrote:

> Luke & I talked in person a bit. I want to give context for what is at
> stake here, in terms of side inputs in portability. A decent starting place
> is https://s.apache.org/beam-side-inputs-1-pager
>
> In that general design, the runner offers the SDK just one (or a few)
> materialization strategies, and the SDK builds idiomatic structures on top
> of it. Concretely, the Fn API today offers a multimap structure, and the
> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
> runner to materialize. As a naive example, a simple iterable structure
> could just map all elements to one dummy key in the multimap. But if you
> wanted a list plus its length, then you might map all elements to an
> element key and the length to a special length meta-key.
>
> So there is a problem: if the SDK is outputting a new KV<"elements-key",
> ...> and KV<"length-key", ...> for the runner to materialize then consumers
> of the side input need to see both updates to the materialization or
> neither. In general, these outputs might span many keys.
>
> It seems like there are a few ways to resolve this tension:
>
>  - Establish a consistency model so these updates will be observed
> together. Seems hard and whatever we come up with will limit runners, limit
> efficiency, and potentially leak into users having to reason about
> concurrency
>
>  - Instead of building the variety of side input views on one primitive
> multimap materialization, force runners to provide many primitive
> materializations with consistency under the hood. Not hard to get started,
> but adds an unfortunate new dimension for runners to vary in functionality
> and performance, versus letting them optimize just one or a few
> materializations
>
>  - Have no consistency and just not support side input methods that would
> require consistent metadata. I'm curious what features this will hurt.
>
>  - Have no consistency but require the SDK to build some sort of large
> value since single-element consistency is built in to the model always.
> Today many runners do concatenate all elements into one value, though that
> does not perform well. Making this effective probably requires new model
> features.
>
> Kenn
>
> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:
>
>> One thing to keep in mind: triggers that fire multiple times per window
>> already tend to be non deterministic. These are element-count or
>> processing-time triggers, both of which are fairly non deterministic in
>> firing.
>>
>> Reuven
>>
>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Today, we define that a side input becomes available to be consumed once
>>> at least one firing occurs or when the runner detects that no such output
>>> could be produced (e.g. watermark is beyond the end of the window when
>>> using the default trigger). For triggers that fire at most once, consumers
>>> are guaranteed to have a consistent view of the contents of the side input.
>>> But what happens when the trigger fire multiple times?
>>>
>>> Lets say we have a pipeline containing:
>>> ParDo(A) --> PCollectionView S
>>>          \-> PCollectionView T
>>>
>>>   ...
>>>    |
>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>    |
>>>   ...
>>>
>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>> can also see Y (and vice versa)?
>>>
>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>>> to see X only if it can also see Y?
>>>
>>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Kenneth Knowles <ke...@apache.org>.
Luke & I talked in person a bit. I want to give context for what is at
stake here, in terms of side inputs in portability. A decent starting place
is https://s.apache.org/beam-side-inputs-1-pager

In that general design, the runner offers the SDK just one (or a few)
materialization strategies, and the SDK builds idiomatic structures on top
of it. Concretely, the Fn API today offers a multimap structure, and the
idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
runner to materialize. As a naive example, a simple iterable structure
could just map all elements to one dummy key in the multimap. But if you
wanted a list plus its length, then you might map all elements to an
element key and the length to a special length meta-key.

So there is a problem: if the SDK is outputting a new KV<"elements-key",
...> and KV<"length-key", ...> for the runner to materialize then consumers
of the side input need to see both updates to the materialization or
neither. In general, these outputs might span many keys.

It seems like there are a few ways to resolve this tension:

 - Establish a consistency model so these updates will be observed
together. Seems hard and whatever we come up with will limit runners, limit
efficiency, and potentially leak into users having to reason about
concurrency

 - Instead of building the variety of side input views on one primitive
multimap materialization, force runners to provide many primitive
materializations with consistency under the hood. Not hard to get started,
but adds an unfortunate new dimension for runners to vary in functionality
and performance, versus letting them optimize just one or a few
materializations

 - Have no consistency and just not support side input methods that would
require consistent metadata. I'm curious what features this will hurt.

 - Have no consistency but require the SDK to build some sort of large
value since single-element consistency is built in to the model always.
Today many runners do concatenate all elements into one value, though that
does not perform well. Making this effective probably requires new model
features.

Kenn

On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <re...@google.com> wrote:

> One thing to keep in mind: triggers that fire multiple times per window
> already tend to be non deterministic. These are element-count or
> processing-time triggers, both of which are fairly non deterministic in
> firing.
>
> Reuven
>
> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Today, we define that a side input becomes available to be consumed once
>> at least one firing occurs or when the runner detects that no such output
>> could be produced (e.g. watermark is beyond the end of the window when
>> using the default trigger). For triggers that fire at most once, consumers
>> are guaranteed to have a consistent view of the contents of the side input.
>> But what happens when the trigger fire multiple times?
>>
>> Lets say we have a pipeline containing:
>> ParDo(A) --> PCollectionView S
>>          \-> PCollectionView T
>>
>>   ...
>>    |
>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>    |
>>   ...
>>
>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>> can also see Y (and vice versa)?
>>
>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be guaranteed
>> to see X only if it can also see Y?
>>
>

Re: [DISCUSS] Side input consistency guarantees for triggers with multiple firings

Posted by Reuven Lax <re...@google.com>.
One thing to keep in mind: triggers that fire multiple times per window
already tend to be non deterministic. These are element-count or
processing-time triggers, both of which are fairly non deterministic in
firing.

Reuven

On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <lc...@google.com> wrote:

> Today, we define that a side input becomes available to be consumed once
> at least one firing occurs or when the runner detects that no such output
> could be produced (e.g. watermark is beyond the end of the window when
> using the default trigger). For triggers that fire at most once, consumers
> are guaranteed to have a consistent view of the contents of the side input.
> But what happens when the trigger fire multiple times?
>
> Lets say we have a pipeline containing:
> ParDo(A) --> PCollectionView S
>          \-> PCollectionView T
>
>   ...
>    |
> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>    |
>   ...
>
> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
> can also see Y (and vice versa)?
>
> 2) Lets say ParDo(A) outputs (during a single bundle) X to PCollectionView
> S and Y to PCollectionView T, should ParDo(C) be guaranteed to see X only
> if it can also see Y?
>