You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Pablo Estrada <pa...@google.com> on 2021/09/23 23:30:12 UTC

Documenting per-key delivery semantics for various runners

Hi all,
I've been spending some time thinking about CDC use cases on Beam. One
valuable piece to enable these use cases is to define how Beam deals with
ordering of elements in streaming pipelines.
With that in mind, I wrote a document[1] that proposes a definition of the
ordering semantics supported by most Beam runners, and a pull request [2]
with ValidatesRunner tests and documentation updates.

Would you please review these, add your comments and thoughts, and let me
know if they make sense?

Thanks!
-P.

[1]
https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#

[2] https://github.com/apache/beam/pull/15378

Re: Documenting per-key delivery semantics for various runners

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Pablo,

thanks for the motivating examples. I understand the motivation now, but 
one question comes to mind - we do not mind the case, when in-between 
the emitting PTransform and the comsuming PTransform is another 
(grouping) PTransform which _changes the key_? Via the change of key, 
two elements originally emitted with the same key, can change the key to 
two different ones, and then back to the same one, which would obviously 
violate any ordering defined on the original emitting transform. I'm 
aware, that the per-key definition describes the two PTransforms to be 
directly connected. I'm just asking, if we would not want to solve this 
for the more general case.

In the original design document for @RequiresTimeSortedInput [1], there 
was (not implemented) mention about "User supplied sorting criterion", 
which I believe is exactly what for instance Kafka offset offers, and 
what is actually described by the per-key delivery semantics. The key 
point is that each element is (anyhow) assigned a sequential ID, which 
then defines order as seen by a per-key authoritative _observer_ (for 
instance, the source emitting transform, in the case of Kafka that is 
the leader for a partition, and so on). If this per-key sequential ID is 
carried along the element, then the order can be reconstructed at any 
downstream stage.

I'm +1 to splitting the documentation and validation / implementation 
parts, that sounds good to me.

  Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


On 9/27/21 11:56 PM, Pablo Estrada wrote:
> Hello all,
> thanks for your comments.
>
> All runners that I've tested have these semantics for streaming. See 
> the PR[1] with the cap matrix changes.
>
> I agree it makes sense to add a pipeline check for this. I think I 
> would like to receive comments / agreement on the definition and the 
> changes to the documentation - and then follow up with the pipeline 
> check. Is that reasonable for everyone?
>
> I have added a section of motivating use cases to the document, Jan. 
> Let me know if those make sense.
>
> [1] https://github.com/apache/beam/pull/15378 
> <https://github.com/apache/beam/pull/15378>
>
> Thanks
> -P
>
> On Sat, Sep 25, 2021 at 1:06 PM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     +1 to adding a Pipeline requirement for this, if business logic
>     relies
>     on a specific feature runner might/might not have, then Pipeline
>     should
>     be rejected on runners that do not support it. Do we have a list
>     runners
>     that have or lack this semantics? Just for clarification - sorry my
>     ignorance, if this has been already described - do we have a
>     description
>     of the use-cases that drive this effort?
>
>     Thanks,
>
>       Jan
>
>     On 9/24/21 10:58 PM, Robert Bradshaw wrote:
>     > Thanks for writing this up. Rather than just documenting it,
>     should we
>     > have a way of asserting/requesting it (like time sorted inputs) such
>     > that a pipeline author that needs to rely on this property can be
>     > rejected on runners that don't provide it?
>     >
>     > On Fri, Sep 24, 2021 at 12:25 PM Kenneth Knowles
>     <kenn@apache.org <ma...@apache.org>> wrote:
>     >> Took a look. I definitely agree that something like this is
>     useful, and well-motivated by the use cases you raise.
>     >>
>     >> Kenn
>     >>
>     >> On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada
>     <pabloem@google.com <ma...@google.com>> wrote:
>     >>> Hi all,
>     >>> I've been spending some time thinking about CDC use cases on
>     Beam. One valuable piece to enable these use cases is to define
>     how Beam deals with ordering of elements in streaming pipelines.
>     >>> With that in mind, I wrote a document[1] that proposes a
>     definition of the ordering semantics supported by most Beam
>     runners, and a pull request [2] with ValidatesRunner tests and
>     documentation updates.
>     >>>
>     >>> Would you please review these, add your comments and thoughts,
>     and let me know if they make sense?
>     >>>
>     >>> Thanks!
>     >>> -P.
>     >>>
>     >>> [1]
>     https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
>     <https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#>
>     >>> [2] https://github.com/apache/beam/pull/15378
>     <https://github.com/apache/beam/pull/15378>
>

Re: Documenting per-key delivery semantics for various runners

Posted by Pablo Estrada <pa...@google.com>.
Hello all,
thanks for your comments.

All runners that I've tested have these semantics for streaming. See the
PR[1] with the cap matrix changes.

I agree it makes sense to add a pipeline check for this. I think I would
like to receive comments / agreement on the definition and the changes to
the documentation - and then follow up with the pipeline check. Is that
reasonable for everyone?

I have added a section of motivating use cases to the document, Jan. Let me
know if those make sense.

[1] https://github.com/apache/beam/pull/15378

Thanks
-P

On Sat, Sep 25, 2021 at 1:06 PM Jan Lukavský <je...@seznam.cz> wrote:

> +1 to adding a Pipeline requirement for this, if business logic relies
> on a specific feature runner might/might not have, then Pipeline should
> be rejected on runners that do not support it. Do we have a list runners
> that have or lack this semantics? Just for clarification - sorry my
> ignorance, if this has been already described - do we have a description
> of the use-cases that drive this effort?
>
> Thanks,
>
>   Jan
>
> On 9/24/21 10:58 PM, Robert Bradshaw wrote:
> > Thanks for writing this up. Rather than just documenting it, should we
> > have a way of asserting/requesting it (like time sorted inputs) such
> > that a pipeline author that needs to rely on this property can be
> > rejected on runners that don't provide it?
> >
> > On Fri, Sep 24, 2021 at 12:25 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >> Took a look. I definitely agree that something like this is useful, and
> well-motivated by the use cases you raise.
> >>
> >> Kenn
> >>
> >> On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada <pa...@google.com>
> wrote:
> >>> Hi all,
> >>> I've been spending some time thinking about CDC use cases on Beam. One
> valuable piece to enable these use cases is to define how Beam deals with
> ordering of elements in streaming pipelines.
> >>> With that in mind, I wrote a document[1] that proposes a definition of
> the ordering semantics supported by most Beam runners, and a pull request
> [2] with ValidatesRunner tests and documentation updates.
> >>>
> >>> Would you please review these, add your comments and thoughts, and let
> me know if they make sense?
> >>>
> >>> Thanks!
> >>> -P.
> >>>
> >>> [1]
> https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
> >>> [2] https://github.com/apache/beam/pull/15378
>

Re: Documenting per-key delivery semantics for various runners

Posted by Jan Lukavský <je...@seznam.cz>.
+1 to adding a Pipeline requirement for this, if business logic relies 
on a specific feature runner might/might not have, then Pipeline should 
be rejected on runners that do not support it. Do we have a list runners 
that have or lack this semantics? Just for clarification - sorry my 
ignorance, if this has been already described - do we have a description 
of the use-cases that drive this effort?

Thanks,

  Jan

On 9/24/21 10:58 PM, Robert Bradshaw wrote:
> Thanks for writing this up. Rather than just documenting it, should we
> have a way of asserting/requesting it (like time sorted inputs) such
> that a pipeline author that needs to rely on this property can be
> rejected on runners that don't provide it?
>
> On Fri, Sep 24, 2021 at 12:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>> Took a look. I definitely agree that something like this is useful, and well-motivated by the use cases you raise.
>>
>> Kenn
>>
>> On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada <pa...@google.com> wrote:
>>> Hi all,
>>> I've been spending some time thinking about CDC use cases on Beam. One valuable piece to enable these use cases is to define how Beam deals with ordering of elements in streaming pipelines.
>>> With that in mind, I wrote a document[1] that proposes a definition of the ordering semantics supported by most Beam runners, and a pull request [2] with ValidatesRunner tests and documentation updates.
>>>
>>> Would you please review these, add your comments and thoughts, and let me know if they make sense?
>>>
>>> Thanks!
>>> -P.
>>>
>>> [1] https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
>>> [2] https://github.com/apache/beam/pull/15378

Re: Documenting per-key delivery semantics for various runners

Posted by Robert Bradshaw <ro...@google.com>.
Thanks for writing this up. Rather than just documenting it, should we
have a way of asserting/requesting it (like time sorted inputs) such
that a pipeline author that needs to rely on this property can be
rejected on runners that don't provide it?

On Fri, Sep 24, 2021 at 12:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> Took a look. I definitely agree that something like this is useful, and well-motivated by the use cases you raise.
>
> Kenn
>
> On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada <pa...@google.com> wrote:
>>
>> Hi all,
>> I've been spending some time thinking about CDC use cases on Beam. One valuable piece to enable these use cases is to define how Beam deals with ordering of elements in streaming pipelines.
>> With that in mind, I wrote a document[1] that proposes a definition of the ordering semantics supported by most Beam runners, and a pull request [2] with ValidatesRunner tests and documentation updates.
>>
>> Would you please review these, add your comments and thoughts, and let me know if they make sense?
>>
>> Thanks!
>> -P.
>>
>> [1] https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
>> [2] https://github.com/apache/beam/pull/15378

Re: Documenting per-key delivery semantics for various runners

Posted by Kenneth Knowles <ke...@apache.org>.
Took a look. I definitely agree that something like this is useful, and
well-motivated by the use cases you raise.

Kenn

On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada <pa...@google.com> wrote:

> Hi all,
> I've been spending some time thinking about CDC use cases on Beam. One
> valuable piece to enable these use cases is to define how Beam deals with
> ordering of elements in streaming pipelines.
> With that in mind, I wrote a document[1] that proposes a definition of the
> ordering semantics supported by most Beam runners, and a pull request [2]
> with ValidatesRunner tests and documentation updates.
>
> Would you please review these, add your comments and thoughts, and let me
> know if they make sense?
>
> Thanks!
> -P.
>
> [1]
> https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
>
> [2] https://github.com/apache/beam/pull/15378
>