You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aaron Dixon <at...@gmail.com> on 2019/11/25 21:27:03 UTC

real real-time beam

Suppose I trigger a Combine per-element (in a high-volume stream) and use a
ParDo as a sink.

I assume there is no guarantee about the order that my ParDo will see these
triggers, especially as it processes in parallel, anyway.

That said, my sink writes to a db or cache and I would not like the cache
to ever regress its value to something "before" what it has already written.

Is the best way to solve this problem to always write the event-time in the
cache and do a compare-and-swap only updating the sink if the triggered
value in-hand is later than the target value?

Or is there a better way to guarantee that my ParDo sink will process
elements in-order? (Eg, if I can give up per-event/real-time, then a
delay-based trigger would probably be sufficient I imagine.)

Thanks for advice!

Re: real real-time beam

Posted by Pablo Estrada <pa...@google.com>.
If I understand correctly - your pipeline has some kind of windowing, and
on every trigger downstream of the combiner, the pipeline updates a cache
with a single, non-windowed value. Is that correct?

What are your keys for this pipeline? You could work this out with, as you
noted, a timer that fires periodically, and keeps some state with the value
that you want to update to the cache.

Is this a Python or Java pipeline? What is the runner?
Best
-P.

On Mon, Nov 25, 2019 at 1:27 PM Aaron Dixon <at...@gmail.com> wrote:

> Suppose I trigger a Combine per-element (in a high-volume stream) and use
> a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the cache
> to ever regress its value to something "before" what it has already written.
>
> Is the best way to solve this problem to always write the event-time in
> the cache and do a compare-and-swap only updating the sink if the triggered
> value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!
>

Re: real real-time beam

Posted by Pablo Estrada <pa...@google.com>.
If I understand correctly - your pipeline has some kind of windowing, and
on every trigger downstream of the combiner, the pipeline updates a cache
with a single, non-windowed value. Is that correct?

What are your keys for this pipeline? You could work this out with, as you
noted, a timer that fires periodically, and keeps some state with the value
that you want to update to the cache.

Is this a Python or Java pipeline? What is the runner?
Best
-P.

On Mon, Nov 25, 2019 at 1:27 PM Aaron Dixon <at...@gmail.com> wrote:

> Suppose I trigger a Combine per-element (in a high-volume stream) and use
> a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the cache
> to ever regress its value to something "before" what it has already written.
>
> Is the best way to solve this problem to always write the event-time in
> the cache and do a compare-and-swap only updating the sink if the triggered
> value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!
>

Re: real real-time beam

Posted by Aaron Dixon <at...@gmail.com>.
> Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Yes I do thank you. I really appreciate the thorough help from everyone
Thank you

On Wed, Dec 4, 2019 at 9:41 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
> On 12/4/19 5:38 AM, Kenneth Knowles wrote:
>
> Jan - let's try to defrag the threads on your time sorting proposal. This
> thread may have useful ideas but I want to focus on helping Aaron in this
> thread. You can link to this thread from other threads or from a design
> doc. Does this seem OK to you?
>
> sure. :-)
>
> I actually think the best thread to continue the discussion would be [1].
> The reason why this discussion probably got fragmented is that the other
> threads seem to die out without any conclusion. :-(
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E
>
>
> Aaron - do you have the information you need to implement your sink? My
> impression is that you have quite a good grasp of the issues even before
> you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Thanks for this insight. I didn't know about the relation between trigger
>> firing (event) time - which is always non-decreasing - and the resulting
>> timestamp of output pane - which can be affected by timestamp combiner and
>> decrease in cases you describe. What actually correlates with the pane
>> index at all times is processing time of trigger firings with the pane
>> index. Would you say, that if the "annotation that would guarantee ordering
>> of panes" could be viewed as a time ordering annotation with an additional
>> time domain (event time, processing time)? Could then these two be viewed
>> as a single one with some distinguishing parameter?
>>
>> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>>
>> ?
>>
>> Event time should be probably made the default, because that is
>> information that is accessible with every WindowedValue, while pane index
>> is available only after GBK (or generally might be available after every
>> keyed sequential operation, but is missing after source for instance).
>>
>> Jan
>> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>>
>> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> > I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> I cannot agree with the last sentence (and I'm really not doing this on
>>> purpose :-)). Panes generally arrive out of order, as mentioned several
>>> times in the discussions linked from this thread. If we want to ensure
>>> "trigger firing ordering", we can use the pane index, that is correct. But
>>> - that is actually equivalent to sorting by event time, because pane index
>>> order will be (nearly) the same as event time order. This is due to the
>>> fact, that pane index and event time correlate (both are monotonic).
>>>
>> Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Kenn
>>
>> *In fact, they can decrease via the "maximum" timestamp combiner because
>> actually timestamp combiners only apply to the elements that particular
>> pane. This is weird, and maybe a design bug, but good to know about.
>>
>>
>>> The pane index "only" solves the issue of preserving ordering even in
>>> case where there are multiple firings within the same timestamp (regardless
>>> of granularity). This was mentioned in the initial discussion about event
>>> time ordering, and is part of the design doc - users should be allowed to
>>> provide UDF for extracting time-correlated ordering field (which means
>>> ability to choose a preferred, or authoritative, observer which assigns
>>> unambiguous ordering to events). Example of this might include Kafka
>>> offsets as well, or any queue index for that matter. This is not yet
>>> implemented, but could (should) be in the future.
>>>
>>> The only case where these two things are (somewhat) different is the
>>> case mentioned by @Steve - if the output is stateless ParDo, which will get
>>> fused. But that is only because the processing is single-threaded per key,
>>> and therefore the ordering is implied by timer ordering (and careful here,
>>> many runners don't have this ordering 100% correct, as of now - this
>>> problem luckily appears only when there are multiple timers per key).
>>> Moreover, if there should be a failure, then the output might (would) get
>>> back in time anyway. If there would be a shuffle operation after
>>> GBK/Combine, then the ordering is no longer guaranteed and must be
>>> explicitly taken care of.
>>>
>>> Last note, I must agree with @Rui that all these discussions are very
>>> much related to retractions (precisely the ability to implement them).
>>>
>>> Jan
>>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>>
>>> Hi Aaron,
>>>
>>> Another insightful observation.
>>>
>>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>>> there is a per-key sequence number attached. It is included in metadata
>>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>>> referred to as the "pane index". You can also make use of the "on time
>>> index" if you like. The best way to access this metadata is to add a
>>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>>> works for stateful or stateless DoFn.
>>>
>>> Most of Beam's IO connectors do not explicitly enforce that outputs
>>> occur in pane index order but instead rely on the hope that the runner
>>> delivers panes in order to the sink. IMO this is dangerous but it has not
>>> yet caused a known issue. In practice, each "input key to output key 'path'
>>> " through a pipeline's logic does preserve order for all existing runners
>>> AFAIK and it is the formalization that is missing. It is related to an
>>> observation by +Rui Wang <ru...@google.com> that processing
>>> retractions requires the same key-to-key ordering.
>>>
>>> I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com>
>>> wrote:
>>>
>>>> The blog posts on stateful and timely computation with Beam should help
>>>> clarify a lot about how to use state and timers to do this:
>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>
>>>> You'll see there how there's an implicit per-single-element grouping
>>>> for each key, so state and timers should support your use case very well.
>>>>
>>>> Best
>>>> -P.
>>>>
>>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>>> operation (in all runners I know of).
>>>>>
>>>>> There have been a couple threads about trigger ordering as well on the
>>>>> list recently that might have more information:
>>>>>
>>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>>
>>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>>>
>>>>>> @Jan @Pablo Thank you
>>>>>>
>>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>>> forever.
>>>>>>
>>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>>> compare-and-swap in the target store :thinking:.)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>>> probably
>>>>>>> one more option
>>>>>>>
>>>>>>>   c) create a ParDo keyed by primary key of your sink, cache the
>>>>>>> last
>>>>>>> write in there and compare it locally, without the need to query the
>>>>>>> database
>>>>>>>
>>>>>>> It would still need some timer to clear values after watermark +
>>>>>>> allowed
>>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>>> database
>>>>>>> on workers. But because you don't need actual ordering, you just
>>>>>>> need
>>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>>> > Hi Aaron,
>>>>>>> >
>>>>>>> > maybe someone else will give another option, but if I understand
>>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>>> either:
>>>>>>> >
>>>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>>>> >
>>>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo
>>>>>>> and
>>>>>>> > only output them when watermark passes (using a timer).
>>>>>>> >
>>>>>>> > There is actually an ongoing discussion about how to make option
>>>>>>> b)
>>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>>> > out-of-the-box solution for that.
>>>>>>> >
>>>>>>> > Jan
>>>>>>> >
>>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>>> and
>>>>>>> >> use a ParDo as a sink.
>>>>>>> >>
>>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>>> see
>>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>>> >>
>>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>>> the
>>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>>> >> already written.
>>>>>>> >>
>>>>>>> >> Is the best way to solve this problem to always write the
>>>>>>> event-time
>>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>>> the
>>>>>>> >> triggered value in-hand is later than the target value?
>>>>>>> >>
>>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>>> process
>>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time,
>>>>>>> then a
>>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>>> >>
>>>>>>> >> Thanks for advice!
>>>>>>>
>>>>>>

Re: real real-time beam

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

On 12/4/19 5:38 AM, Kenneth Knowles wrote:
> Jan - let's try to defrag the threads on your time sorting proposal. 
> This thread may have useful ideas but I want to focus on helping Aaron 
> in this thread. You can link to this thread from other threads or from 
> a design doc. Does this seem OK to you?

sure. :-)

I actually think the best thread to continue the discussion would be 
[1]. The reason why this discussion probably got fragmented is that the 
other threads seem to die out without any conclusion. :-(

Jan

[1] 
https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E

>
> Aaron - do you have the information you need to implement your sink? 
> My impression is that you have quite a good grasp of the issues even 
> before you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     > Trigger firings can have decreasing event timestamps w/ the
>     minimum timestamp combiner*. I do think the issue at hand is best
>     analyzed in terms of the explicit ordering on panes. And I do
>     think we need to have an explicit guarantee or annotation strong
>     enough to describe a correct-under-all-allowed runners sink. Today
>     an antagonistic runner could probably break a lot of things.
>
>     Thanks for this insight. I didn't know about the relation between
>     trigger firing (event) time - which is always non-decreasing - and
>     the resulting timestamp of output pane - which can be affected by
>     timestamp combiner and decrease in cases you describe. What
>     actually correlates with the pane index at all times is processing
>     time of trigger firings with the pane index. Would you say, that
>     if the "annotation that would guarantee ordering of panes" could
>     be viewed as a time ordering annotation with an additional time
>     domain (event time, processing time)? Could then these two be
>     viewed as a single one with some distinguishing parameter?
>
>     @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
>     ?
>
>     Event time should be probably made the default, because that is
>     information that is accessible with every WindowedValue, while
>     pane index is available only after GBK (or generally might be
>     available after every keyed sequential operation, but is missing
>     after source for instance).
>
>     Jan
>
>     On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>>     On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         > I will not try to formalize this notion in this email. But
>>         I will note that since it is universally assured, it would be
>>         zero cost and significantly safer to formalize it and add an
>>         annotation noting it was required. It has nothing to do with
>>         event time ordering, only trigger firing ordering.
>>
>>         I cannot agree with the last sentence (and I'm really not
>>         doing this on purpose :-)). Panes generally arrive out of
>>         order, as mentioned several times in the discussions linked
>>         from this thread. If we want to ensure "trigger firing
>>         ordering", we can use the pane index, that is correct. But -
>>         that is actually equivalent to sorting by event time, because
>>         pane index order will be (nearly) the same as event time
>>         order. This is due to the fact, that pane index and event
>>         time correlate (both are monotonic).
>>
>>     Trigger firings can have decreasing event timestamps w/ the
>>     minimum timestamp combiner*. I do think the issue at hand is best
>>     analyzed in terms of the explicit ordering on panes. And I do
>>     think we need to have an explicit guarantee or annotation strong
>>     enough to describe a correct-under-all-allowed runners sink.
>>     Today an antagonistic runner could probably break a lot of things.
>>
>>     Kenn
>>
>>     *In fact, they can decrease via the "maximum" timestamp combiner
>>     because actually timestamp combiners only apply to the elements
>>     that particular pane. This is weird, and maybe a design bug, but
>>     good to know about.
>>
>>         The pane index "only" solves the issue of preserving ordering
>>         even in case where there are multiple firings within the same
>>         timestamp (regardless of granularity). This was mentioned in
>>         the initial discussion about event time ordering, and is part
>>         of the design doc - users should be allowed to provide UDF
>>         for extracting time-correlated ordering field (which means
>>         ability to choose a preferred, or authoritative, observer
>>         which assigns unambiguous ordering to events). Example of
>>         this might include Kafka offsets as well, or any queue index
>>         for that matter. This is not yet implemented, but could
>>         (should) be in the future.
>>
>>         The only case where these two things are (somewhat) different
>>         is the case mentioned by @Steve - if the output is stateless
>>         ParDo, which will get fused. But that is only because the
>>         processing is single-threaded per key, and therefore the
>>         ordering is implied by timer ordering (and careful here, many
>>         runners don't have this ordering 100% correct, as of now -
>>         this problem luckily appears only when there are multiple
>>         timers per key). Moreover, if there should be a failure, then
>>         the output might (would) get back in time anyway. If there
>>         would be a shuffle operation after GBK/Combine, then the
>>         ordering is no longer guaranteed and must be explicitly taken
>>         care of.
>>
>>         Last note, I must agree with @Rui that all these discussions
>>         are very much related to retractions (precisely the ability
>>         to implement them).
>>
>>         Jan
>>
>>         On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>>         Hi Aaron,
>>>
>>>         Another insightful observation.
>>>
>>>         Whenever an aggregation (GBK / Combine per key) has a
>>>         trigger firing, there is a per-key sequence number attached.
>>>         It is included in metadata known as "PaneInfo" [1]. The
>>>         value of PaneInfo.getIndex() is colloquially referred to as
>>>         the "pane index". You can also make use of the "on time
>>>         index" if you like. The best way to access this metadata is
>>>         to add a parameter of type PaneInfo to your
>>>         DoFn's @ProcessElement method. This works for stateful or
>>>         stateless DoFn.
>>>
>>>         Most of Beam's IO connectors do not explicitly enforce that
>>>         outputs occur in pane index order but instead rely on the
>>>         hope that the runner delivers panes in order to the sink.
>>>         IMO this is dangerous but it has not yet caused a known
>>>         issue. In practice, each "input key to output key 'path' "
>>>         through a pipeline's logic does preserve order for all
>>>         existing runners AFAIK and it is the formalization that is
>>>         missing. It is related to an observation by +Rui Wang
>>>         <ma...@google.com> that processing retractions
>>>         requires the same key-to-key ordering.
>>>
>>>         I will not try to formalize this notion in this email. But I
>>>         will note that since it is universally assured, it would be
>>>         zero cost and significantly safer to formalize it and add an
>>>         annotation noting it was required. It has nothing to do with
>>>         event time ordering, only trigger firing ordering.
>>>
>>>         Kenn
>>>
>>>         [1]
>>>         https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>>         [2]
>>>         https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>>         On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada
>>>         <pabloem@google.com <ma...@google.com>> wrote:
>>>
>>>             The blog posts on stateful and timely computation with
>>>             Beam should help clarify a lot about how to use state
>>>             and timers to do this:
>>>             https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>             https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>>             You'll see there how there's an implicit
>>>             per-single-element grouping for each key, so state and
>>>             timers should support your use case very well.
>>>
>>>             Best
>>>             -P.
>>>
>>>             On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
>>>             <sniemitz@apache.org <ma...@apache.org>> wrote:
>>>
>>>                 If you have a pipeline that looks like Input ->
>>>                 GroupByKey -> ParDo, while it is not guaranteed, in
>>>                 practice the sink will observe the trigger firings
>>>                 in order (per key), since it'll be fused to the
>>>                 output of the GBK operation (in all runners I know of).
>>>
>>>                 There have been a couple threads about trigger
>>>                 ordering as well on the list recently that might
>>>                 have more information:
>>>                 https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>                 https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>
>>>
>>>                 On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
>>>                 <atdixon@gmail.com <ma...@gmail.com>> wrote:
>>>
>>>                     @Jan @Pablo Thank you
>>>
>>>                     @Pablo In this case it's a single global
>>>                     windowed Combine/perKey, triggered per element.
>>>                     Keys are few (client accounts) so they can live
>>>                     forever.
>>>
>>>                     It looks like just by virtue of using a stateful
>>>                     ParDo I could get this final execution to be
>>>                     "serialized" per key. (Then I could simply do
>>>                     the compare-and-swap using Beam's state
>>>                     mechanism to keep track of the "latest trigger
>>>                     timestamp" instead of having to orchestrate
>>>                     compare-and-swap in the target store :thinking:.)
>>>
>>>
>>>
>>>                     On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
>>>                     <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>>                         One addition, to make the list of options
>>>                         exhaustive, there is probably
>>>                         one more option
>>>
>>>                           c) create a ParDo keyed by primary key of
>>>                         your sink, cache the last
>>>                         write in there and compare it locally,
>>>                         without the need to query the
>>>                         database
>>>
>>>                         It would still need some timer to clear
>>>                         values after watermark + allowed
>>>                         lateness, because otherwise you would have
>>>                         to cache your whole database
>>>                         on workers. But because you don't need
>>>                         actual ordering, you just need
>>>                         the most recent value (if I got it right)
>>>                         this might be an option.
>>>
>>>                         Jan
>>>
>>>                         On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>                         > Hi Aaron,
>>>                         >
>>>                         > maybe someone else will give another
>>>                         option, but if I understand
>>>                         > correctly what you want to solve, then you
>>>                         essentially have to do either:
>>>                         >
>>>                         >  a) use the compare & swap mechanism in
>>>                         the sink you described
>>>                         >
>>>                         >  b) use a buffer to buffer elements inside
>>>                         the outputting ParDo and
>>>                         > only output them when watermark passes
>>>                         (using a timer).
>>>                         >
>>>                         > There is actually an ongoing discussion
>>>                         about how to make option b)
>>>                         > user-friendly and part of Beam itself, but
>>>                         currently there is no
>>>                         > out-of-the-box solution for that.
>>>                         >
>>>                         > Jan
>>>                         >
>>>                         > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>                         >> Suppose I trigger a Combine per-element
>>>                         (in a high-volume stream) and
>>>                         >> use a ParDo as a sink.
>>>                         >>
>>>                         >> I assume there is no guarantee about the
>>>                         order that my ParDo will see
>>>                         >> these triggers, especially as it
>>>                         processes in parallel, anyway.
>>>                         >>
>>>                         >> That said, my sink writes to a db or
>>>                         cache and I would not like the
>>>                         >> cache to ever regress its value to
>>>                         something "before" what it has
>>>                         >> already written.
>>>                         >>
>>>                         >> Is the best way to solve this problem to
>>>                         always write the event-time
>>>                         >> in the cache and do a compare-and-swap
>>>                         only updating the sink if the
>>>                         >> triggered value in-hand is later than the
>>>                         target value?
>>>                         >>
>>>                         >> Or is there a better way to guarantee
>>>                         that my ParDo sink will process
>>>                         >> elements in-order? (Eg, if I can give up
>>>                         per-event/real-time, then a
>>>                         >> delay-based trigger would probably be
>>>                         sufficient I imagine.)
>>>                         >>
>>>                         >> Thanks for advice!
>>>

Re: real real-time beam

Posted by Kenneth Knowles <ke...@apache.org>.
Jan - let's try to defrag the threads on your time sorting proposal. This
thread may have useful ideas but I want to focus on helping Aaron in this
thread. You can link to this thread from other threads or from a design
doc. Does this seem OK to you?

Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je...@seznam.cz> wrote:

> > Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between trigger
> firing (event) time - which is always non-decreasing - and the resulting
> timestamp of output pane - which can be affected by timestamp combiner and
> decrease in cases you describe. What actually correlates with the pane
> index at all times is processing time of trigger firings with the pane
> index. Would you say, that if the "annotation that would guarantee ordering
> of panes" could be viewed as a time ordering annotation with an additional
> time domain (event time, processing time)? Could then these two be viewed
> as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while pane index
> is available only after GBK (or generally might be available after every
> keyed sequential operation, but is missing after source for instance).
>
> Jan
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not doing this on
>> purpose :-)). Panes generally arrive out of order, as mentioned several
>> times in the discussions linked from this thread. If we want to ensure
>> "trigger firing ordering", we can use the pane index, that is correct. But
>> - that is actually equivalent to sorting by event time, because pane index
>> order will be (nearly) the same as event time order. This is due to the
>> fact, that pane index and event time correlate (both are monotonic).
>>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner because
> actually timestamp combiners only apply to the elements that particular
> pane. This is weird, and maybe a design bug, but good to know about.
>
>
>> The pane index "only" solves the issue of preserving ordering even in
>> case where there are multiple firings within the same timestamp (regardless
>> of granularity). This was mentioned in the initial discussion about event
>> time ordering, and is part of the design doc - users should be allowed to
>> provide UDF for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer which assigns
>> unambiguous ordering to events). Example of this might include Kafka
>> offsets as well, or any queue index for that matter. This is not yet
>> implemented, but could (should) be in the future.
>>
>> The only case where these two things are (somewhat) different is the case
>> mentioned by @Steve - if the output is stateless ParDo, which will get
>> fused. But that is only because the processing is single-threaded per key,
>> and therefore the ordering is implied by timer ordering (and careful here,
>> many runners don't have this ordering 100% correct, as of now - this
>> problem luckily appears only when there are multiple timers per key).
>> Moreover, if there should be a failure, then the output might (would) get
>> back in time anyway. If there would be a shuffle operation after
>> GBK/Combine, then the ordering is no longer guaranteed and must be
>> explicitly taken care of.
>>
>> Last note, I must agree with @Rui that all these discussions are very
>> much related to retractions (precisely the ability to implement them).
>>
>> Jan
>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>
>> Hi Aaron,
>>
>> Another insightful observation.
>>
>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>> there is a per-key sequence number attached. It is included in metadata
>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>> referred to as the "pane index". You can also make use of the "on time
>> index" if you like. The best way to access this metadata is to add a
>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>> works for stateful or stateless DoFn.
>>
>> Most of Beam's IO connectors do not explicitly enforce that outputs occur
>> in pane index order but instead rely on the hope that the runner delivers
>> panes in order to the sink. IMO this is dangerous but it has not yet caused
>> a known issue. In practice, each "input key to output key 'path' " through
>> a pipeline's logic does preserve order for all existing runners AFAIK and
>> it is the formalization that is missing. It is related to an observation by +Rui
>> Wang <ru...@google.com> that processing retractions requires the same
>> key-to-key ordering.
>>
>> I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>
>>
>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> The blog posts on stateful and timely computation with Beam should help
>>> clarify a lot about how to use state and timers to do this:
>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> You'll see there how there's an implicit per-single-element grouping for
>>> each key, so state and timers should support your use case very well.
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>> operation (in all runners I know of).
>>>>
>>>> There have been a couple threads about trigger ordering as well on the
>>>> list recently that might have more information:
>>>>
>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>
>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>>
>>>>> @Jan @Pablo Thank you
>>>>>
>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>> forever.
>>>>>
>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>> compare-and-swap in the target store :thinking:.)
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>> probably
>>>>>> one more option
>>>>>>
>>>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>>>> write in there and compare it locally, without the need to query the
>>>>>> database
>>>>>>
>>>>>> It would still need some timer to clear values after watermark +
>>>>>> allowed
>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>> database
>>>>>> on workers. But because you don't need actual ordering, you just need
>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>> > Hi Aaron,
>>>>>> >
>>>>>> > maybe someone else will give another option, but if I understand
>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>> either:
>>>>>> >
>>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>>> >
>>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>>>> > only output them when watermark passes (using a timer).
>>>>>> >
>>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>> > out-of-the-box solution for that.
>>>>>> >
>>>>>> > Jan
>>>>>> >
>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>> and
>>>>>> >> use a ParDo as a sink.
>>>>>> >>
>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>> see
>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>> >>
>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>> the
>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>> >> already written.
>>>>>> >>
>>>>>> >> Is the best way to solve this problem to always write the
>>>>>> event-time
>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>> the
>>>>>> >> triggered value in-hand is later than the target value?
>>>>>> >>
>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>> process
>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>>> a
>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>> >>
>>>>>> >> Thanks for advice!
>>>>>>
>>>>>

Re: real real-time beam

Posted by Jan Lukavský <je...@seznam.cz>.
 > Trigger firings can have decreasing event timestamps w/ the minimum 
timestamp combiner*. I do think the issue at hand is best analyzed in 
terms of the explicit ordering on panes. And I do think we need to have 
an explicit guarantee or annotation strong enough to describe a 
correct-under-all-allowed runners sink. Today an antagonistic runner 
could probably break a lot of things.

Thanks for this insight. I didn't know about the relation between 
trigger firing (event) time - which is always non-decreasing - and the 
resulting timestamp of output pane - which can be affected by timestamp 
combiner and decrease in cases you describe. What actually correlates 
with the pane index at all times is processing time of trigger firings 
with the pane index. Would you say, that if the "annotation that would 
guarantee ordering of panes" could be viewed as a time ordering 
annotation with an additional time domain (event time, processing time)? 
Could then these two be viewed as a single one with some distinguishing 
parameter?

@RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)

?

Event time should be probably made the default, because that is 
information that is accessible with every WindowedValue, while pane 
index is available only after GBK (or generally might be available after 
every keyed sequential operation, but is missing after source for instance).

Jan

On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     > I will not try to formalize this notion in this email. But I
>     will note that since it is universally assured, it would be zero
>     cost and significantly safer to formalize it and add an annotation
>     noting it was required. It has nothing to do with event time
>     ordering, only trigger firing ordering.
>
>     I cannot agree with the last sentence (and I'm really not doing
>     this on purpose :-)). Panes generally arrive out of order, as
>     mentioned several times in the discussions linked from this
>     thread. If we want to ensure "trigger firing ordering", we can use
>     the pane index, that is correct. But - that is actually equivalent
>     to sorting by event time, because pane index order will be
>     (nearly) the same as event time order. This is due to the fact,
>     that pane index and event time correlate (both are monotonic).
>
> Trigger firings can have decreasing event timestamps w/ the minimum 
> timestamp combiner*. I do think the issue at hand is best analyzed in 
> terms of the explicit ordering on panes. And I do think we need to 
> have an explicit guarantee or annotation strong enough to describe a 
> correct-under-all-allowed runners sink. Today an antagonistic runner 
> could probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner 
> because actually timestamp combiners only apply to the elements that 
> particular pane. This is weird, and maybe a design bug, but good to 
> know about.
>
>     The pane index "only" solves the issue of preserving ordering even
>     in case where there are multiple firings within the same timestamp
>     (regardless of granularity). This was mentioned in the initial
>     discussion about event time ordering, and is part of the design
>     doc - users should be allowed to provide UDF for extracting
>     time-correlated ordering field (which means ability to choose a
>     preferred, or authoritative, observer which assigns unambiguous
>     ordering to events). Example of this might include Kafka offsets
>     as well, or any queue index for that matter. This is not yet
>     implemented, but could (should) be in the future.
>
>     The only case where these two things are (somewhat) different is
>     the case mentioned by @Steve - if the output is stateless ParDo,
>     which will get fused. But that is only because the processing is
>     single-threaded per key, and therefore the ordering is implied by
>     timer ordering (and careful here, many runners don't have this
>     ordering 100% correct, as of now - this problem luckily appears
>     only when there are multiple timers per key). Moreover, if there
>     should be a failure, then the output might (would) get back in
>     time anyway. If there would be a shuffle operation after
>     GBK/Combine, then the ordering is no longer guaranteed and must be
>     explicitly taken care of.
>
>     Last note, I must agree with @Rui that all these discussions are
>     very much related to retractions (precisely the ability to
>     implement them).
>
>     Jan
>
>     On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>     Hi Aaron,
>>
>>     Another insightful observation.
>>
>>     Whenever an aggregation (GBK / Combine per key) has a trigger
>>     firing, there is a per-key sequence number attached. It is
>>     included in metadata known as "PaneInfo" [1]. The value of
>>     PaneInfo.getIndex() is colloquially referred to as the "pane
>>     index". You can also make use of the "on time index" if you like.
>>     The best way to access this metadata is to add a parameter of
>>     type PaneInfo to your DoFn's @ProcessElement method. This works
>>     for stateful or stateless DoFn.
>>
>>     Most of Beam's IO connectors do not explicitly enforce that
>>     outputs occur in pane index order but instead rely on the hope
>>     that the runner delivers panes in order to the sink. IMO this is
>>     dangerous but it has not yet caused a known issue. In practice,
>>     each "input key to output key 'path' " through a pipeline's logic
>>     does preserve order for all existing runners AFAIK and it is the
>>     formalization that is missing. It is related to an observation by
>>     +Rui Wang <ma...@google.com> that processing retractions
>>     requires the same key-to-key ordering.
>>
>>     I will not try to formalize this notion in this email. But I will
>>     note that since it is universally assured, it would be zero cost
>>     and significantly safer to formalize it and add an annotation
>>     noting it was required. It has nothing to do with event time
>>     ordering, only trigger firing ordering.
>>
>>     Kenn
>>
>>     [1]
>>     https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>     [2]
>>     https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>
>>
>>     On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabloem@google.com
>>     <ma...@google.com>> wrote:
>>
>>         The blog posts on stateful and timely computation with Beam
>>         should help clarify a lot about how to use state and timers
>>         to do this:
>>         https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>         https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>>         You'll see there how there's an implicit per-single-element
>>         grouping for each key, so state and timers should support
>>         your use case very well.
>>
>>         Best
>>         -P.
>>
>>         On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
>>         <sniemitz@apache.org <ma...@apache.org>> wrote:
>>
>>             If you have a pipeline that looks like Input ->
>>             GroupByKey -> ParDo, while it is not guaranteed, in
>>             practice the sink will observe the trigger firings in
>>             order (per key), since it'll be fused to the output of
>>             the GBK operation (in all runners I know of).
>>
>>             There have been a couple threads about trigger ordering
>>             as well on the list recently that might have more
>>             information:
>>             https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>             https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>>             On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
>>             <atdixon@gmail.com <ma...@gmail.com>> wrote:
>>
>>                 @Jan @Pablo Thank you
>>
>>                 @Pablo In this case it's a single global windowed
>>                 Combine/perKey, triggered per element. Keys are few
>>                 (client accounts) so they can live forever.
>>
>>                 It looks like just by virtue of using a stateful
>>                 ParDo I could get this final execution to be
>>                 "serialized" per key. (Then I could simply do the
>>                 compare-and-swap using Beam's state mechanism to keep
>>                 track of the "latest trigger timestamp" instead of
>>                 having to orchestrate compare-and-swap in the target
>>                 store :thinking:.)
>>
>>
>>
>>                 On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
>>                 <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>
>>                     One addition, to make the list of options
>>                     exhaustive, there is probably
>>                     one more option
>>
>>                       c) create a ParDo keyed by primary key of your
>>                     sink, cache the last
>>                     write in there and compare it locally, without
>>                     the need to query the
>>                     database
>>
>>                     It would still need some timer to clear values
>>                     after watermark + allowed
>>                     lateness, because otherwise you would have to
>>                     cache your whole database
>>                     on workers. But because you don't need actual
>>                     ordering, you just need
>>                     the most recent value (if I got it right) this
>>                     might be an option.
>>
>>                     Jan
>>
>>                     On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>                     > Hi Aaron,
>>                     >
>>                     > maybe someone else will give another option,
>>                     but if I understand
>>                     > correctly what you want to solve, then you
>>                     essentially have to do either:
>>                     >
>>                     >  a) use the compare & swap mechanism in the
>>                     sink you described
>>                     >
>>                     >  b) use a buffer to buffer elements inside the
>>                     outputting ParDo and
>>                     > only output them when watermark passes (using a
>>                     timer).
>>                     >
>>                     > There is actually an ongoing discussion about
>>                     how to make option b)
>>                     > user-friendly and part of Beam itself, but
>>                     currently there is no
>>                     > out-of-the-box solution for that.
>>                     >
>>                     > Jan
>>                     >
>>                     > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>                     >> Suppose I trigger a Combine per-element (in a
>>                     high-volume stream) and
>>                     >> use a ParDo as a sink.
>>                     >>
>>                     >> I assume there is no guarantee about the order
>>                     that my ParDo will see
>>                     >> these triggers, especially as it processes in
>>                     parallel, anyway.
>>                     >>
>>                     >> That said, my sink writes to a db or cache and
>>                     I would not like the
>>                     >> cache to ever regress its value to something
>>                     "before" what it has
>>                     >> already written.
>>                     >>
>>                     >> Is the best way to solve this problem to
>>                     always write the event-time
>>                     >> in the cache and do a compare-and-swap only
>>                     updating the sink if the
>>                     >> triggered value in-hand is later than the
>>                     target value?
>>                     >>
>>                     >> Or is there a better way to guarantee that my
>>                     ParDo sink will process
>>                     >> elements in-order? (Eg, if I can give up
>>                     per-event/real-time, then a
>>                     >> delay-based trigger would probably be
>>                     sufficient I imagine.)
>>                     >>
>>                     >> Thanks for advice!
>>

Re: real real-time beam

Posted by Kenneth Knowles <ke...@apache.org>.
On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:

> > I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> I cannot agree with the last sentence (and I'm really not doing this on
> purpose :-)). Panes generally arrive out of order, as mentioned several
> times in the discussions linked from this thread. If we want to ensure
> "trigger firing ordering", we can use the pane index, that is correct. But
> - that is actually equivalent to sorting by event time, because pane index
> order will be (nearly) the same as event time order. This is due to the
> fact, that pane index and event time correlate (both are monotonic).
>
Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in terms
of the explicit ordering on panes. And I do think we need to have an
explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner could
probably break a lot of things.

Kenn

*In fact, they can decrease via the "maximum" timestamp combiner because
actually timestamp combiners only apply to the elements that particular
pane. This is weird, and maybe a design bug, but good to know about.


> The pane index "only" solves the issue of preserving ordering even in case
> where there are multiple firings within the same timestamp (regardless of
> granularity). This was mentioned in the initial discussion about event time
> ordering, and is part of the design doc - users should be allowed to
> provide UDF for extracting time-correlated ordering field (which means
> ability to choose a preferred, or authoritative, observer which assigns
> unambiguous ordering to events). Example of this might include Kafka
> offsets as well, or any queue index for that matter. This is not yet
> implemented, but could (should) be in the future.
>
> The only case where these two things are (somewhat) different is the case
> mentioned by @Steve - if the output is stateless ParDo, which will get
> fused. But that is only because the processing is single-threaded per key,
> and therefore the ordering is implied by timer ordering (and careful here,
> many runners don't have this ordering 100% correct, as of now - this
> problem luckily appears only when there are multiple timers per key).
> Moreover, if there should be a failure, then the output might (would) get
> back in time anyway. If there would be a shuffle operation after
> GBK/Combine, then the ordering is no longer guaranteed and must be
> explicitly taken care of.
>
> Last note, I must agree with @Rui that all these discussions are very much
> related to retractions (precisely the ability to implement them).
>
> Jan
> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
> there is a per-key sequence number attached. It is included in metadata
> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
> referred to as the "pane index". You can also make use of the "on time
> index" if you like. The best way to access this metadata is to add a
> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
> works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs occur
> in pane index order but instead rely on the hope that the runner delivers
> panes in order to the sink. IMO this is dangerous but it has not yet caused
> a known issue. In practice, each "input key to output key 'path' " through
> a pipeline's logic does preserve order for all existing runners AFAIK and
> it is the formalization that is missing. It is related to an observation by +Rui
> Wang <ru...@google.com> that processing retractions requires the same
> key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:
>
>> The blog posts on stateful and timely computation with Beam should help
>> clarify a lot about how to use state and timers to do this:
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> You'll see there how there's an implicit per-single-element grouping for
>> each key, so state and timers should support your use case very well.
>>
>> Best
>> -P.
>>
>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>> while it is not guaranteed, in practice the sink will observe the trigger
>>> firings in order (per key), since it'll be fused to the output of the GBK
>>> operation (in all runners I know of).
>>>
>>> There have been a couple threads about trigger ordering as well on the
>>> list recently that might have more information:
>>>
>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>
>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>
>>>
>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>
>>>> @Jan @Pablo Thank you
>>>>
>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>> triggered per element. Keys are few (client accounts) so they can live
>>>> forever.
>>>>
>>>> It looks like just by virtue of using a stateful ParDo I could get this
>>>> final execution to be "serialized" per key. (Then I could simply do the
>>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>>> target store :thinking:.)
>>>>
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> One addition, to make the list of options exhaustive, there is
>>>>> probably
>>>>> one more option
>>>>>
>>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>>> write in there and compare it locally, without the need to query the
>>>>> database
>>>>>
>>>>> It would still need some timer to clear values after watermark +
>>>>> allowed
>>>>> lateness, because otherwise you would have to cache your whole
>>>>> database
>>>>> on workers. But because you don't need actual ordering, you just need
>>>>> the most recent value (if I got it right) this might be an option.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>> > Hi Aaron,
>>>>> >
>>>>> > maybe someone else will give another option, but if I understand
>>>>> > correctly what you want to solve, then you essentially have to do
>>>>> either:
>>>>> >
>>>>> >  a) use the compare & swap mechanism in the sink you described
>>>>> >
>>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>>> > only output them when watermark passes (using a timer).
>>>>> >
>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>> > out-of-the-box solution for that.
>>>>> >
>>>>> > Jan
>>>>> >
>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>> and
>>>>> >> use a ParDo as a sink.
>>>>> >>
>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>> see
>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>> >>
>>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>>> >> cache to ever regress its value to something "before" what it has
>>>>> >> already written.
>>>>> >>
>>>>> >> Is the best way to solve this problem to always write the
>>>>> event-time
>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>> the
>>>>> >> triggered value in-hand is later than the target value?
>>>>> >>
>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>> process
>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>> a
>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>> >>
>>>>> >> Thanks for advice!
>>>>>
>>>>

Re: real real-time beam

Posted by Jan Lukavský <je...@seznam.cz>.
 > I will not try to formalize this notion in this email. But I will 
note that since it is universally assured, it would be zero cost and 
significantly safer to formalize it and add an annotation noting it was 
required. It has nothing to do with event time ordering, only trigger 
firing ordering.

I cannot agree with the last sentence (and I'm really not doing this on 
purpose :-)). Panes generally arrive out of order, as mentioned several 
times in the discussions linked from this thread. If we want to ensure 
"trigger firing ordering", we can use the pane index, that is correct. 
But - that is actually equivalent to sorting by event time, because pane 
index order will be (nearly) the same as event time order. This is due 
to the fact, that pane index and event time correlate (both are 
monotonic). The pane index "only" solves the issue of preserving 
ordering even in case where there are multiple firings within the same 
timestamp (regardless of granularity). This was mentioned in the initial 
discussion about event time ordering, and is part of the design doc - 
users should be allowed to provide UDF for extracting time-correlated 
ordering field (which means ability to choose a preferred, or 
authoritative, observer which assigns unambiguous ordering to events). 
Example of this might include Kafka offsets as well, or any queue index 
for that matter. This is not yet implemented, but could (should) be in 
the future.

The only case where these two things are (somewhat) different is the 
case mentioned by @Steve - if the output is stateless ParDo, which will 
get fused. But that is only because the processing is single-threaded 
per key, and therefore the ordering is implied by timer ordering (and 
careful here, many runners don't have this ordering 100% correct, as of 
now - this problem luckily appears only when there are multiple timers 
per key). Moreover, if there should be a failure, then the output might 
(would) get back in time anyway. If there would be a shuffle operation 
after GBK/Combine, then the ordering is no longer guaranteed and must be 
explicitly taken care of.

Last note, I must agree with @Rui that all these discussions are very 
much related to retractions (precisely the ability to implement them).

Jan

On 11/26/19 7:34 AM, Kenneth Knowles wrote:
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing, 
> there is a per-key sequence number attached. It is included in 
> metadata known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is 
> colloquially referred to as the "pane index". You can also make use of 
> the "on time index" if you like. The best way to access this metadata 
> is to add a parameter of type PaneInfo to your DoFn's @ProcessElement 
> method. This works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs 
> occur in pane index order but instead rely on the hope that the runner 
> delivers panes in order to the sink. IMO this is dangerous but it has 
> not yet caused a known issue. In practice, each "input key to output 
> key 'path' " through a pipeline's logic does preserve order for all 
> existing runners AFAIK and it is the formalization that is missing. It 
> is related to an observation by +Rui Wang 
> <ma...@google.com> that processing retractions requires the 
> same key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note 
> that since it is universally assured, it would be zero cost and 
> significantly safer to formalize it and add an annotation noting it 
> was required. It has nothing to do with event time ordering, only 
> trigger firing ordering.
>
> Kenn
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2] 
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabloem@google.com 
> <ma...@google.com>> wrote:
>
>     The blog posts on stateful and timely computation with Beam should
>     help clarify a lot about how to use state and timers to do this:
>     https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>     https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
>     You'll see there how there's an implicit per-single-element
>     grouping for each key, so state and timers should support your use
>     case very well.
>
>     Best
>     -P.
>
>     On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniemitz@apache.org
>     <ma...@apache.org>> wrote:
>
>         If you have a pipeline that looks like Input -> GroupByKey ->
>         ParDo, while it is not guaranteed, in practice the sink will
>         observe the trigger firings in order (per key), since it'll be
>         fused to the output of the GBK operation (in all runners I
>         know of).
>
>         There have been a couple threads about trigger ordering as
>         well on the list recently that might have more information:
>         https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>         https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
>         On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdixon@gmail.com
>         <ma...@gmail.com>> wrote:
>
>             @Jan @Pablo Thank you
>
>             @Pablo In this case it's a single global windowed
>             Combine/perKey, triggered per element. Keys are few
>             (client accounts) so they can live forever.
>
>             It looks like just by virtue of using a stateful ParDo I
>             could get this final execution to be "serialized" per key.
>             (Then I could simply do the compare-and-swap using Beam's
>             state mechanism to keep track of the "latest trigger
>             timestamp" instead of having to orchestrate
>             compare-and-swap in the target store :thinking:.)
>
>
>
>             On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
>             <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>
>                 One addition, to make the list of options exhaustive,
>                 there is probably
>                 one more option
>
>                   c) create a ParDo keyed by primary key of your sink,
>                 cache the last
>                 write in there and compare it locally, without the
>                 need to query the
>                 database
>
>                 It would still need some timer to clear values after
>                 watermark + allowed
>                 lateness, because otherwise you would have to cache
>                 your whole database
>                 on workers. But because you don't need actual
>                 ordering, you just need
>                 the most recent value (if I got it right) this might
>                 be an option.
>
>                 Jan
>
>                 On 11/25/19 10:53 PM, Jan Lukavský wrote:
>                 > Hi Aaron,
>                 >
>                 > maybe someone else will give another option, but if
>                 I understand
>                 > correctly what you want to solve, then you
>                 essentially have to do either:
>                 >
>                 >  a) use the compare & swap mechanism in the sink you
>                 described
>                 >
>                 >  b) use a buffer to buffer elements inside the
>                 outputting ParDo and
>                 > only output them when watermark passes (using a timer).
>                 >
>                 > There is actually an ongoing discussion about how to
>                 make option b)
>                 > user-friendly and part of Beam itself, but currently
>                 there is no
>                 > out-of-the-box solution for that.
>                 >
>                 > Jan
>                 >
>                 > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>                 >> Suppose I trigger a Combine per-element (in a
>                 high-volume stream) and
>                 >> use a ParDo as a sink.
>                 >>
>                 >> I assume there is no guarantee about the order that
>                 my ParDo will see
>                 >> these triggers, especially as it processes in
>                 parallel, anyway.
>                 >>
>                 >> That said, my sink writes to a db or cache and I
>                 would not like the
>                 >> cache to ever regress its value to something
>                 "before" what it has
>                 >> already written.
>                 >>
>                 >> Is the best way to solve this problem to always
>                 write the event-time
>                 >> in the cache and do a compare-and-swap only
>                 updating the sink if the
>                 >> triggered value in-hand is later than the target value?
>                 >>
>                 >> Or is there a better way to guarantee that my ParDo
>                 sink will process
>                 >> elements in-order? (Eg, if I can give up
>                 per-event/real-time, then a
>                 >> delay-based trigger would probably be sufficient I
>                 imagine.)
>                 >>
>                 >> Thanks for advice!
>

Re: real real-time beam

Posted by Kenneth Knowles <ke...@apache.org>.
Hi Aaron,

Another insightful observation.

Whenever an aggregation (GBK / Combine per key) has a trigger firing, there
is a per-key sequence number attached. It is included in metadata known as
"PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially referred
to as the "pane index". You can also make use of the "on time index" if you
like. The best way to access this metadata is to add a parameter of type
PaneInfo to your DoFn's @ProcessElement method. This works for stateful or
stateless DoFn.

Most of Beam's IO connectors do not explicitly enforce that outputs occur
in pane index order but instead rely on the hope that the runner delivers
panes in order to the sink. IMO this is dangerous but it has not yet caused
a known issue. In practice, each "input key to output key 'path' " through
a pipeline's logic does preserve order for all existing runners AFAIK and
it is the formalization that is missing. It is related to an
observation by +Rui
Wang <ru...@google.com> that processing retractions requires the same
key-to-key ordering.

I will not try to formalize this notion in this email. But I will note that
since it is universally assured, it would be zero cost and significantly
safer to formalize it and add an annotation noting it was required. It has
nothing to do with event time ordering, only trigger firing ordering.

Kenn

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557


On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:

> The blog posts on stateful and timely computation with Beam should help
> clarify a lot about how to use state and timers to do this:
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> You'll see there how there's an implicit per-single-element grouping for
> each key, so state and timers should support your use case very well.
>
> Best
> -P.
>
> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>> while it is not guaranteed, in practice the sink will observe the trigger
>> firings in order (per key), since it'll be fused to the output of the GBK
>> operation (in all runners I know of).
>>
>> There have been a couple threads about trigger ordering as well on the
>> list recently that might have more information:
>>
>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>
>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> @Jan @Pablo Thank you
>>>
>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>> triggered per element. Keys are few (client accounts) so they can live
>>> forever.
>>>
>>> It looks like just by virtue of using a stateful ParDo I could get this
>>> final execution to be "serialized" per key. (Then I could simply do the
>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>> target store :thinking:.)
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> One addition, to make the list of options exhaustive, there is probably
>>>> one more option
>>>>
>>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>>> write in there and compare it locally, without the need to query the
>>>> database
>>>>
>>>> It would still need some timer to clear values after watermark +
>>>> allowed
>>>> lateness, because otherwise you would have to cache your whole database
>>>> on workers. But because you don't need actual ordering, you just need
>>>> the most recent value (if I got it right) this might be an option.
>>>>
>>>> Jan
>>>>
>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>> > Hi Aaron,
>>>> >
>>>> > maybe someone else will give another option, but if I understand
>>>> > correctly what you want to solve, then you essentially have to do
>>>> either:
>>>> >
>>>> >  a) use the compare & swap mechanism in the sink you described
>>>> >
>>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>>> > only output them when watermark passes (using a timer).
>>>> >
>>>> > There is actually an ongoing discussion about how to make option b)
>>>> > user-friendly and part of Beam itself, but currently there is no
>>>> > out-of-the-box solution for that.
>>>> >
>>>> > Jan
>>>> >
>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>> and
>>>> >> use a ParDo as a sink.
>>>> >>
>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>> see
>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>> >>
>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>> >> cache to ever regress its value to something "before" what it has
>>>> >> already written.
>>>> >>
>>>> >> Is the best way to solve this problem to always write the event-time
>>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>>> >> triggered value in-hand is later than the target value?
>>>> >>
>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>> process
>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>> >>
>>>> >> Thanks for advice!
>>>>
>>>

Re: real real-time beam

Posted by Pablo Estrada <pa...@google.com>.
The blog posts on stateful and timely computation with Beam should help
clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html

You'll see there how there's an implicit per-single-element grouping for
each key, so state and timers should support your use case very well.

Best
-P.

On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org> wrote:

> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
> it is not guaranteed, in practice the sink will observe the trigger firings
> in order (per key), since it'll be fused to the output of the GBK operation
> (in all runners I know of).
>
> There have been a couple threads about trigger ordering as well on the
> list recently that might have more information:
>
> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> @Jan @Pablo Thank you
>>
>> @Pablo In this case it's a single global windowed Combine/perKey,
>> triggered per element. Keys are few (client accounts) so they can live
>> forever.
>>
>> It looks like just by virtue of using a stateful ParDo I could get this
>> final execution to be "serialized" per key. (Then I could simply do the
>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>> target store :thinking:.)
>>
>>
>>
>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> One addition, to make the list of options exhaustive, there is probably
>>> one more option
>>>
>>>   c) create a ParDo keyed by primary key of your sink, cache the last
>>> write in there and compare it locally, without the need to query the
>>> database
>>>
>>> It would still need some timer to clear values after watermark + allowed
>>> lateness, because otherwise you would have to cache your whole database
>>> on workers. But because you don't need actual ordering, you just need
>>> the most recent value (if I got it right) this might be an option.
>>>
>>> Jan
>>>
>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>> > Hi Aaron,
>>> >
>>> > maybe someone else will give another option, but if I understand
>>> > correctly what you want to solve, then you essentially have to do
>>> either:
>>> >
>>> >  a) use the compare & swap mechanism in the sink you described
>>> >
>>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>>> > only output them when watermark passes (using a timer).
>>> >
>>> > There is actually an ongoing discussion about how to make option b)
>>> > user-friendly and part of Beam itself, but currently there is no
>>> > out-of-the-box solution for that.
>>> >
>>> > Jan
>>> >
>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>>> >> use a ParDo as a sink.
>>> >>
>>> >> I assume there is no guarantee about the order that my ParDo will see
>>> >> these triggers, especially as it processes in parallel, anyway.
>>> >>
>>> >> That said, my sink writes to a db or cache and I would not like the
>>> >> cache to ever regress its value to something "before" what it has
>>> >> already written.
>>> >>
>>> >> Is the best way to solve this problem to always write the event-time
>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>> >> triggered value in-hand is later than the target value?
>>> >>
>>> >> Or is there a better way to guarantee that my ParDo sink will process
>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>> >> delay-based trigger would probably be sufficient I imagine.)
>>> >>
>>> >> Thanks for advice!
>>>
>>

Re: real real-time beam

Posted by Steve Niemitz <sn...@apache.org>.
If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
it is not guaranteed, in practice the sink will observe the trigger firings
in order (per key), since it'll be fused to the output of the GBK operation
(in all runners I know of).

There have been a couple threads about trigger ordering as well on the list
recently that might have more information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E


On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:

> @Jan @Pablo Thank you
>
> @Pablo In this case it's a single global windowed Combine/perKey,
> triggered per element. Keys are few (client accounts) so they can live
> forever.
>
> It looks like just by virtue of using a stateful ParDo I could get this
> final execution to be "serialized" per key. (Then I could simply do the
> compare-and-swap using Beam's state mechanism to keep track of the "latest
> trigger timestamp" instead of having to orchestrate compare-and-swap in the
> target store :thinking:.)
>
>
>
> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> One addition, to make the list of options exhaustive, there is probably
>> one more option
>>
>>   c) create a ParDo keyed by primary key of your sink, cache the last
>> write in there and compare it locally, without the need to query the
>> database
>>
>> It would still need some timer to clear values after watermark + allowed
>> lateness, because otherwise you would have to cache your whole database
>> on workers. But because you don't need actual ordering, you just need
>> the most recent value (if I got it right) this might be an option.
>>
>> Jan
>>
>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>> > Hi Aaron,
>> >
>> > maybe someone else will give another option, but if I understand
>> > correctly what you want to solve, then you essentially have to do
>> either:
>> >
>> >  a) use the compare & swap mechanism in the sink you described
>> >
>> >  b) use a buffer to buffer elements inside the outputting ParDo and
>> > only output them when watermark passes (using a timer).
>> >
>> > There is actually an ongoing discussion about how to make option b)
>> > user-friendly and part of Beam itself, but currently there is no
>> > out-of-the-box solution for that.
>> >
>> > Jan
>> >
>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>> >> use a ParDo as a sink.
>> >>
>> >> I assume there is no guarantee about the order that my ParDo will see
>> >> these triggers, especially as it processes in parallel, anyway.
>> >>
>> >> That said, my sink writes to a db or cache and I would not like the
>> >> cache to ever regress its value to something "before" what it has
>> >> already written.
>> >>
>> >> Is the best way to solve this problem to always write the event-time
>> >> in the cache and do a compare-and-swap only updating the sink if the
>> >> triggered value in-hand is later than the target value?
>> >>
>> >> Or is there a better way to guarantee that my ParDo sink will process
>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>> >> delay-based trigger would probably be sufficient I imagine.)
>> >>
>> >> Thanks for advice!
>>
>

Re: real real-time beam

Posted by Aaron Dixon <at...@gmail.com>.
@Jan @Pablo Thank you

@Pablo In this case it's a single global windowed Combine/perKey, triggered
per element. Keys are few (client accounts) so they can live forever.

It looks like just by virtue of using a stateful ParDo I could get this
final execution to be "serialized" per key. (Then I could simply do the
compare-and-swap using Beam's state mechanism to keep track of the "latest
trigger timestamp" instead of having to orchestrate compare-and-swap in the
target store :thinking:.)



On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:

> One addition, to make the list of options exhaustive, there is probably
> one more option
>
>   c) create a ParDo keyed by primary key of your sink, cache the last
> write in there and compare it locally, without the need to query the
> database
>
> It would still need some timer to clear values after watermark + allowed
> lateness, because otherwise you would have to cache your whole database
> on workers. But because you don't need actual ordering, you just need
> the most recent value (if I got it right) this might be an option.
>
> Jan
>
> On 11/25/19 10:53 PM, Jan Lukavský wrote:
> > Hi Aaron,
> >
> > maybe someone else will give another option, but if I understand
> > correctly what you want to solve, then you essentially have to do either:
> >
> >  a) use the compare & swap mechanism in the sink you described
> >
> >  b) use a buffer to buffer elements inside the outputting ParDo and
> > only output them when watermark passes (using a timer).
> >
> > There is actually an ongoing discussion about how to make option b)
> > user-friendly and part of Beam itself, but currently there is no
> > out-of-the-box solution for that.
> >
> > Jan
> >
> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
> >> use a ParDo as a sink.
> >>
> >> I assume there is no guarantee about the order that my ParDo will see
> >> these triggers, especially as it processes in parallel, anyway.
> >>
> >> That said, my sink writes to a db or cache and I would not like the
> >> cache to ever regress its value to something "before" what it has
> >> already written.
> >>
> >> Is the best way to solve this problem to always write the event-time
> >> in the cache and do a compare-and-swap only updating the sink if the
> >> triggered value in-hand is later than the target value?
> >>
> >> Or is there a better way to guarantee that my ParDo sink will process
> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
> >> delay-based trigger would probably be sufficient I imagine.)
> >>
> >> Thanks for advice!
>

Re: real real-time beam

Posted by Jan Lukavský <je...@seznam.cz>.
One addition, to make the list of options exhaustive, there is probably 
one more option

  c) create a ParDo keyed by primary key of your sink, cache the last 
write in there and compare it locally, without the need to query the 
database

It would still need some timer to clear values after watermark + allowed 
lateness, because otherwise you would have to cache your whole database 
on workers. But because you don't need actual ordering, you just need 
the most recent value (if I got it right) this might be an option.

Jan

On 11/25/19 10:53 PM, Jan Lukavský wrote:
> Hi Aaron,
>
> maybe someone else will give another option, but if I understand 
> correctly what you want to solve, then you essentially have to do either:
>
>  a) use the compare & swap mechanism in the sink you described
>
>  b) use a buffer to buffer elements inside the outputting ParDo and 
> only output them when watermark passes (using a timer).
>
> There is actually an ongoing discussion about how to make option b) 
> user-friendly and part of Beam itself, but currently there is no 
> out-of-the-box solution for that.
>
> Jan
>
> On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> Suppose I trigger a Combine per-element (in a high-volume stream) and 
>> use a ParDo as a sink.
>>
>> I assume there is no guarantee about the order that my ParDo will see 
>> these triggers, especially as it processes in parallel, anyway.
>>
>> That said, my sink writes to a db or cache and I would not like the 
>> cache to ever regress its value to something "before" what it has 
>> already written.
>>
>> Is the best way to solve this problem to always write the event-time 
>> in the cache and do a compare-and-swap only updating the sink if the 
>> triggered value in-hand is later than the target value?
>>
>> Or is there a better way to guarantee that my ParDo sink will process 
>> elements in-order? (Eg, if I can give up per-event/real-time, then a 
>> delay-based trigger would probably be sufficient I imagine.)
>>
>> Thanks for advice!

Re: real real-time beam

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Aaron,

maybe someone else will give another option, but if I understand 
correctly what you want to solve, then you essentially have to do either:

  a) use the compare & swap mechanism in the sink you described

  b) use a buffer to buffer elements inside the outputting ParDo and 
only output them when watermark passes (using a timer).

There is actually an ongoing discussion about how to make option b) 
user-friendly and part of Beam itself, but currently there is no 
out-of-the-box solution for that.

Jan

On 11/25/19 10:27 PM, Aaron Dixon wrote:
> Suppose I trigger a Combine per-element (in a high-volume stream) and 
> use a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see 
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the 
> cache to ever regress its value to something "before" what it has 
> already written.
>
> Is the best way to solve this problem to always write the event-time 
> in the cache and do a compare-and-swap only updating the sink if the 
> triggered value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process 
> elements in-order? (Eg, if I can give up per-event/real-time, then a 
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!