You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aaron Dixon <at...@gmail.com> on 2019/11/25 21:27:03 UTC
real real-time beam
Suppose I trigger a Combine per-element (in a high-volume stream) and use a
ParDo as a sink.
I assume there is no guarantee about the order that my ParDo will see these
triggers, especially as it processes in parallel, anyway.
That said, my sink writes to a db or cache and I would not like the cache
to ever regress its value to something "before" what it has already written.
Is the best way to solve this problem to always write the event-time in the
cache and do a compare-and-swap only updating the sink if the triggered
value in-hand is later than the target value?
Or is there a better way to guarantee that my ParDo sink will process
elements in-order? (Eg, if I can give up per-event/real-time, then a
delay-based trigger would probably be sufficient I imagine.)
Thanks for advice!
Re: real real-time beam
Posted by Pablo Estrada <pa...@google.com>.
If I understand correctly - your pipeline has some kind of windowing, and
on every trigger downstream of the combiner, the pipeline updates a cache
with a single, non-windowed value. Is that correct?
What are your keys for this pipeline? You could work this out with, as you
noted, a timer that fires periodically, and keeps some state with the value
that you want to update to the cache.
Is this a Python or Java pipeline? What is the runner?
Best
-P.
On Mon, Nov 25, 2019 at 1:27 PM Aaron Dixon <at...@gmail.com> wrote:
> Suppose I trigger a Combine per-element (in a high-volume stream) and use
> a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the cache
> to ever regress its value to something "before" what it has already written.
>
> Is the best way to solve this problem to always write the event-time in
> the cache and do a compare-and-swap only updating the sink if the triggered
> value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!
>
Re: real real-time beam
Posted by Pablo Estrada <pa...@google.com>.
If I understand correctly - your pipeline has some kind of windowing, and
on every trigger downstream of the combiner, the pipeline updates a cache
with a single, non-windowed value. Is that correct?
What are your keys for this pipeline? You could work this out with, as you
noted, a timer that fires periodically, and keeps some state with the value
that you want to update to the cache.
Is this a Python or Java pipeline? What is the runner?
Best
-P.
On Mon, Nov 25, 2019 at 1:27 PM Aaron Dixon <at...@gmail.com> wrote:
> Suppose I trigger a Combine per-element (in a high-volume stream) and use
> a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the cache
> to ever regress its value to something "before" what it has already written.
>
> Is the best way to solve this problem to always write the event-time in
> the cache and do a compare-and-swap only updating the sink if the triggered
> value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!
>
Re: real real-time beam
Posted by Aaron Dixon <at...@gmail.com>.
> Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.
Yes I do thank you. I really appreciate the thorough help from everyone
Thank you
On Wed, Dec 4, 2019 at 9:41 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi Kenn,
> On 12/4/19 5:38 AM, Kenneth Knowles wrote:
>
> Jan - let's try to defrag the threads on your time sorting proposal. This
> thread may have useful ideas but I want to focus on helping Aaron in this
> thread. You can link to this thread from other threads or from a design
> doc. Does this seem OK to you?
>
> sure. :-)
>
> I actually think the best thread to continue the discussion would be [1].
> The reason why this discussion probably got fragmented is that the other
> threads seem to die out without any conclusion. :-(
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E
>
>
> Aaron - do you have the information you need to implement your sink? My
> impression is that you have quite a good grasp of the issues even before
> you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Thanks for this insight. I didn't know about the relation between trigger
>> firing (event) time - which is always non-decreasing - and the resulting
>> timestamp of output pane - which can be affected by timestamp combiner and
>> decrease in cases you describe. What actually correlates with the pane
>> index at all times is processing time of trigger firings with the pane
>> index. Would you say, that if the "annotation that would guarantee ordering
>> of panes" could be viewed as a time ordering annotation with an additional
>> time domain (event time, processing time)? Could then these two be viewed
>> as a single one with some distinguishing parameter?
>>
>> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>>
>> ?
>>
>> Event time should be probably made the default, because that is
>> information that is accessible with every WindowedValue, while pane index
>> is available only after GBK (or generally might be available after every
>> keyed sequential operation, but is missing after source for instance).
>>
>> Jan
>> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>>
>> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> > I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> I cannot agree with the last sentence (and I'm really not doing this on
>>> purpose :-)). Panes generally arrive out of order, as mentioned several
>>> times in the discussions linked from this thread. If we want to ensure
>>> "trigger firing ordering", we can use the pane index, that is correct. But
>>> - that is actually equivalent to sorting by event time, because pane index
>>> order will be (nearly) the same as event time order. This is due to the
>>> fact, that pane index and event time correlate (both are monotonic).
>>>
>> Trigger firings can have decreasing event timestamps w/ the minimum
>> timestamp combiner*. I do think the issue at hand is best analyzed in terms
>> of the explicit ordering on panes. And I do think we need to have an
>> explicit guarantee or annotation strong enough to describe a
>> correct-under-all-allowed runners sink. Today an antagonistic runner could
>> probably break a lot of things.
>>
>> Kenn
>>
>> *In fact, they can decrease via the "maximum" timestamp combiner because
>> actually timestamp combiners only apply to the elements that particular
>> pane. This is weird, and maybe a design bug, but good to know about.
>>
>>
>>> The pane index "only" solves the issue of preserving ordering even in
>>> case where there are multiple firings within the same timestamp (regardless
>>> of granularity). This was mentioned in the initial discussion about event
>>> time ordering, and is part of the design doc - users should be allowed to
>>> provide UDF for extracting time-correlated ordering field (which means
>>> ability to choose a preferred, or authoritative, observer which assigns
>>> unambiguous ordering to events). Example of this might include Kafka
>>> offsets as well, or any queue index for that matter. This is not yet
>>> implemented, but could (should) be in the future.
>>>
>>> The only case where these two things are (somewhat) different is the
>>> case mentioned by @Steve - if the output is stateless ParDo, which will get
>>> fused. But that is only because the processing is single-threaded per key,
>>> and therefore the ordering is implied by timer ordering (and careful here,
>>> many runners don't have this ordering 100% correct, as of now - this
>>> problem luckily appears only when there are multiple timers per key).
>>> Moreover, if there should be a failure, then the output might (would) get
>>> back in time anyway. If there would be a shuffle operation after
>>> GBK/Combine, then the ordering is no longer guaranteed and must be
>>> explicitly taken care of.
>>>
>>> Last note, I must agree with @Rui that all these discussions are very
>>> much related to retractions (precisely the ability to implement them).
>>>
>>> Jan
>>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>>
>>> Hi Aaron,
>>>
>>> Another insightful observation.
>>>
>>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>>> there is a per-key sequence number attached. It is included in metadata
>>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>>> referred to as the "pane index". You can also make use of the "on time
>>> index" if you like. The best way to access this metadata is to add a
>>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>>> works for stateful or stateless DoFn.
>>>
>>> Most of Beam's IO connectors do not explicitly enforce that outputs
>>> occur in pane index order but instead rely on the hope that the runner
>>> delivers panes in order to the sink. IMO this is dangerous but it has not
>>> yet caused a known issue. In practice, each "input key to output key 'path'
>>> " through a pipeline's logic does preserve order for all existing runners
>>> AFAIK and it is the formalization that is missing. It is related to an
>>> observation by +Rui Wang <ru...@google.com> that processing
>>> retractions requires the same key-to-key ordering.
>>>
>>> I will not try to formalize this notion in this email. But I will note
>>> that since it is universally assured, it would be zero cost and
>>> significantly safer to formalize it and add an annotation noting it was
>>> required. It has nothing to do with event time ordering, only trigger
>>> firing ordering.
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com>
>>> wrote:
>>>
>>>> The blog posts on stateful and timely computation with Beam should help
>>>> clarify a lot about how to use state and timers to do this:
>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>>
>>>> You'll see there how there's an implicit per-single-element grouping
>>>> for each key, so state and timers should support your use case very well.
>>>>
>>>> Best
>>>> -P.
>>>>
>>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>>> operation (in all runners I know of).
>>>>>
>>>>> There have been a couple threads about trigger ordering as well on the
>>>>> list recently that might have more information:
>>>>>
>>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>>
>>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>>>
>>>>>> @Jan @Pablo Thank you
>>>>>>
>>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>>> forever.
>>>>>>
>>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>>> compare-and-swap in the target store :thinking:.)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>>> probably
>>>>>>> one more option
>>>>>>>
>>>>>>> c) create a ParDo keyed by primary key of your sink, cache the
>>>>>>> last
>>>>>>> write in there and compare it locally, without the need to query the
>>>>>>> database
>>>>>>>
>>>>>>> It would still need some timer to clear values after watermark +
>>>>>>> allowed
>>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>>> database
>>>>>>> on workers. But because you don't need actual ordering, you just
>>>>>>> need
>>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>>> > Hi Aaron,
>>>>>>> >
>>>>>>> > maybe someone else will give another option, but if I understand
>>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>>> either:
>>>>>>> >
>>>>>>> > a) use the compare & swap mechanism in the sink you described
>>>>>>> >
>>>>>>> > b) use a buffer to buffer elements inside the outputting ParDo
>>>>>>> and
>>>>>>> > only output them when watermark passes (using a timer).
>>>>>>> >
>>>>>>> > There is actually an ongoing discussion about how to make option
>>>>>>> b)
>>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>>> > out-of-the-box solution for that.
>>>>>>> >
>>>>>>> > Jan
>>>>>>> >
>>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>>> and
>>>>>>> >> use a ParDo as a sink.
>>>>>>> >>
>>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>>> see
>>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>>> >>
>>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>>> the
>>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>>> >> already written.
>>>>>>> >>
>>>>>>> >> Is the best way to solve this problem to always write the
>>>>>>> event-time
>>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>>> the
>>>>>>> >> triggered value in-hand is later than the target value?
>>>>>>> >>
>>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>>> process
>>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time,
>>>>>>> then a
>>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>>> >>
>>>>>>> >> Thanks for advice!
>>>>>>>
>>>>>>
Re: real real-time beam
Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,
On 12/4/19 5:38 AM, Kenneth Knowles wrote:
> Jan - let's try to defrag the threads on your time sorting proposal.
> This thread may have useful ideas but I want to focus on helping Aaron
> in this thread. You can link to this thread from other threads or from
> a design doc. Does this seem OK to you?
sure. :-)
I actually think the best thread to continue the discussion would be
[1]. The reason why this discussion probably got fragmented is that the
other threads seem to die out without any conclusion. :-(
Jan
[1]
https://lists.apache.org/thread.html/e2f729c7cea22553fc34421d4547132fa1c2ec01035eb4fb1a426873%40%3Cdev.beam.apache.org%3E
>
> Aaron - do you have the information you need to implement your sink?
> My impression is that you have quite a good grasp of the issues even
> before you asked.
>
> Kenn
>
> On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je.ik@seznam.cz
> <ma...@seznam.cz>> wrote:
>
> > Trigger firings can have decreasing event timestamps w/ the
> minimum timestamp combiner*. I do think the issue at hand is best
> analyzed in terms of the explicit ordering on panes. And I do
> think we need to have an explicit guarantee or annotation strong
> enough to describe a correct-under-all-allowed runners sink. Today
> an antagonistic runner could probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between
> trigger firing (event) time - which is always non-decreasing - and
> the resulting timestamp of output pane - which can be affected by
> timestamp combiner and decrease in cases you describe. What
> actually correlates with the pane index at all times is processing
> time of trigger firings with the pane index. Would you say, that
> if the "annotation that would guarantee ordering of panes" could
> be viewed as a time ordering annotation with an additional time
> domain (event time, processing time)? Could then these two be
> viewed as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while
> pane index is available only after GBK (or generally might be
> available after every keyed sequential operation, but is missing
> after source for instance).
>
> Jan
>
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>>
>>
>> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je.ik@seznam.cz
>> <ma...@seznam.cz>> wrote:
>>
>> > I will not try to formalize this notion in this email. But
>> I will note that since it is universally assured, it would be
>> zero cost and significantly safer to formalize it and add an
>> annotation noting it was required. It has nothing to do with
>> event time ordering, only trigger firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not
>> doing this on purpose :-)). Panes generally arrive out of
>> order, as mentioned several times in the discussions linked
>> from this thread. If we want to ensure "trigger firing
>> ordering", we can use the pane index, that is correct. But -
>> that is actually equivalent to sorting by event time, because
>> pane index order will be (nearly) the same as event time
>> order. This is due to the fact, that pane index and event
>> time correlate (both are monotonic).
>>
>> Trigger firings can have decreasing event timestamps w/ the
>> minimum timestamp combiner*. I do think the issue at hand is best
>> analyzed in terms of the explicit ordering on panes. And I do
>> think we need to have an explicit guarantee or annotation strong
>> enough to describe a correct-under-all-allowed runners sink.
>> Today an antagonistic runner could probably break a lot of things.
>>
>> Kenn
>>
>> *In fact, they can decrease via the "maximum" timestamp combiner
>> because actually timestamp combiners only apply to the elements
>> that particular pane. This is weird, and maybe a design bug, but
>> good to know about.
>>
>> The pane index "only" solves the issue of preserving ordering
>> even in case where there are multiple firings within the same
>> timestamp (regardless of granularity). This was mentioned in
>> the initial discussion about event time ordering, and is part
>> of the design doc - users should be allowed to provide UDF
>> for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer
>> which assigns unambiguous ordering to events). Example of
>> this might include Kafka offsets as well, or any queue index
>> for that matter. This is not yet implemented, but could
>> (should) be in the future.
>>
>> The only case where these two things are (somewhat) different
>> is the case mentioned by @Steve - if the output is stateless
>> ParDo, which will get fused. But that is only because the
>> processing is single-threaded per key, and therefore the
>> ordering is implied by timer ordering (and careful here, many
>> runners don't have this ordering 100% correct, as of now -
>> this problem luckily appears only when there are multiple
>> timers per key). Moreover, if there should be a failure, then
>> the output might (would) get back in time anyway. If there
>> would be a shuffle operation after GBK/Combine, then the
>> ordering is no longer guaranteed and must be explicitly taken
>> care of.
>>
>> Last note, I must agree with @Rui that all these discussions
>> are very much related to retractions (precisely the ability
>> to implement them).
>>
>> Jan
>>
>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>> Hi Aaron,
>>>
>>> Another insightful observation.
>>>
>>> Whenever an aggregation (GBK / Combine per key) has a
>>> trigger firing, there is a per-key sequence number attached.
>>> It is included in metadata known as "PaneInfo" [1]. The
>>> value of PaneInfo.getIndex() is colloquially referred to as
>>> the "pane index". You can also make use of the "on time
>>> index" if you like. The best way to access this metadata is
>>> to add a parameter of type PaneInfo to your
>>> DoFn's @ProcessElement method. This works for stateful or
>>> stateless DoFn.
>>>
>>> Most of Beam's IO connectors do not explicitly enforce that
>>> outputs occur in pane index order but instead rely on the
>>> hope that the runner delivers panes in order to the sink.
>>> IMO this is dangerous but it has not yet caused a known
>>> issue. In practice, each "input key to output key 'path' "
>>> through a pipeline's logic does preserve order for all
>>> existing runners AFAIK and it is the formalization that is
>>> missing. It is related to an observation by +Rui Wang
>>> <ma...@google.com> that processing retractions
>>> requires the same key-to-key ordering.
>>>
>>> I will not try to formalize this notion in this email. But I
>>> will note that since it is universally assured, it would be
>>> zero cost and significantly safer to formalize it and add an
>>> annotation noting it was required. It has nothing to do with
>>> event time ordering, only trigger firing ordering.
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada
>>> <pabloem@google.com <ma...@google.com>> wrote:
>>>
>>> The blog posts on stateful and timely computation with
>>> Beam should help clarify a lot about how to use state
>>> and timers to do this:
>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> You'll see there how there's an implicit
>>> per-single-element grouping for each key, so state and
>>> timers should support your use case very well.
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
>>> <sniemitz@apache.org <ma...@apache.org>> wrote:
>>>
>>> If you have a pipeline that looks like Input ->
>>> GroupByKey -> ParDo, while it is not guaranteed, in
>>> practice the sink will observe the trigger firings
>>> in order (per key), since it'll be fused to the
>>> output of the GBK operation (in all runners I know of).
>>>
>>> There have been a couple threads about trigger
>>> ordering as well on the list recently that might
>>> have more information:
>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>
>>>
>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
>>> <atdixon@gmail.com <ma...@gmail.com>> wrote:
>>>
>>> @Jan @Pablo Thank you
>>>
>>> @Pablo In this case it's a single global
>>> windowed Combine/perKey, triggered per element.
>>> Keys are few (client accounts) so they can live
>>> forever.
>>>
>>> It looks like just by virtue of using a stateful
>>> ParDo I could get this final execution to be
>>> "serialized" per key. (Then I could simply do
>>> the compare-and-swap using Beam's state
>>> mechanism to keep track of the "latest trigger
>>> timestamp" instead of having to orchestrate
>>> compare-and-swap in the target store :thinking:.)
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
>>> <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>
>>> One addition, to make the list of options
>>> exhaustive, there is probably
>>> one more option
>>>
>>> c) create a ParDo keyed by primary key of
>>> your sink, cache the last
>>> write in there and compare it locally,
>>> without the need to query the
>>> database
>>>
>>> It would still need some timer to clear
>>> values after watermark + allowed
>>> lateness, because otherwise you would have
>>> to cache your whole database
>>> on workers. But because you don't need
>>> actual ordering, you just need
>>> the most recent value (if I got it right)
>>> this might be an option.
>>>
>>> Jan
>>>
>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>> > Hi Aaron,
>>> >
>>> > maybe someone else will give another
>>> option, but if I understand
>>> > correctly what you want to solve, then you
>>> essentially have to do either:
>>> >
>>> > a) use the compare & swap mechanism in
>>> the sink you described
>>> >
>>> > b) use a buffer to buffer elements inside
>>> the outputting ParDo and
>>> > only output them when watermark passes
>>> (using a timer).
>>> >
>>> > There is actually an ongoing discussion
>>> about how to make option b)
>>> > user-friendly and part of Beam itself, but
>>> currently there is no
>>> > out-of-the-box solution for that.
>>> >
>>> > Jan
>>> >
>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>> >> Suppose I trigger a Combine per-element
>>> (in a high-volume stream) and
>>> >> use a ParDo as a sink.
>>> >>
>>> >> I assume there is no guarantee about the
>>> order that my ParDo will see
>>> >> these triggers, especially as it
>>> processes in parallel, anyway.
>>> >>
>>> >> That said, my sink writes to a db or
>>> cache and I would not like the
>>> >> cache to ever regress its value to
>>> something "before" what it has
>>> >> already written.
>>> >>
>>> >> Is the best way to solve this problem to
>>> always write the event-time
>>> >> in the cache and do a compare-and-swap
>>> only updating the sink if the
>>> >> triggered value in-hand is later than the
>>> target value?
>>> >>
>>> >> Or is there a better way to guarantee
>>> that my ParDo sink will process
>>> >> elements in-order? (Eg, if I can give up
>>> per-event/real-time, then a
>>> >> delay-based trigger would probably be
>>> sufficient I imagine.)
>>> >>
>>> >> Thanks for advice!
>>>
Re: real real-time beam
Posted by Kenneth Knowles <ke...@apache.org>.
Jan - let's try to defrag the threads on your time sorting proposal. This
thread may have useful ideas but I want to focus on helping Aaron in this
thread. You can link to this thread from other threads or from a design
doc. Does this seem OK to you?
Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.
Kenn
On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský <je...@seznam.cz> wrote:
> > Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between trigger
> firing (event) time - which is always non-decreasing - and the resulting
> timestamp of output pane - which can be affected by timestamp combiner and
> decrease in cases you describe. What actually correlates with the pane
> index at all times is processing time of trigger firings with the pane
> index. Would you say, that if the "annotation that would guarantee ordering
> of panes" could be viewed as a time ordering annotation with an additional
> time domain (event time, processing time)? Could then these two be viewed
> as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while pane index
> is available only after GBK (or generally might be available after every
> keyed sequential operation, but is missing after source for instance).
>
> Jan
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not doing this on
>> purpose :-)). Panes generally arrive out of order, as mentioned several
>> times in the discussions linked from this thread. If we want to ensure
>> "trigger firing ordering", we can use the pane index, that is correct. But
>> - that is actually equivalent to sorting by event time, because pane index
>> order will be (nearly) the same as event time order. This is due to the
>> fact, that pane index and event time correlate (both are monotonic).
>>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner because
> actually timestamp combiners only apply to the elements that particular
> pane. This is weird, and maybe a design bug, but good to know about.
>
>
>> The pane index "only" solves the issue of preserving ordering even in
>> case where there are multiple firings within the same timestamp (regardless
>> of granularity). This was mentioned in the initial discussion about event
>> time ordering, and is part of the design doc - users should be allowed to
>> provide UDF for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer which assigns
>> unambiguous ordering to events). Example of this might include Kafka
>> offsets as well, or any queue index for that matter. This is not yet
>> implemented, but could (should) be in the future.
>>
>> The only case where these two things are (somewhat) different is the case
>> mentioned by @Steve - if the output is stateless ParDo, which will get
>> fused. But that is only because the processing is single-threaded per key,
>> and therefore the ordering is implied by timer ordering (and careful here,
>> many runners don't have this ordering 100% correct, as of now - this
>> problem luckily appears only when there are multiple timers per key).
>> Moreover, if there should be a failure, then the output might (would) get
>> back in time anyway. If there would be a shuffle operation after
>> GBK/Combine, then the ordering is no longer guaranteed and must be
>> explicitly taken care of.
>>
>> Last note, I must agree with @Rui that all these discussions are very
>> much related to retractions (precisely the ability to implement them).
>>
>> Jan
>> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>>
>> Hi Aaron,
>>
>> Another insightful observation.
>>
>> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
>> there is a per-key sequence number attached. It is included in metadata
>> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
>> referred to as the "pane index". You can also make use of the "on time
>> index" if you like. The best way to access this metadata is to add a
>> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
>> works for stateful or stateless DoFn.
>>
>> Most of Beam's IO connectors do not explicitly enforce that outputs occur
>> in pane index order but instead rely on the hope that the runner delivers
>> panes in order to the sink. IMO this is dangerous but it has not yet caused
>> a known issue. In practice, each "input key to output key 'path' " through
>> a pipeline's logic does preserve order for all existing runners AFAIK and
>> it is the formalization that is missing. It is related to an observation by +Rui
>> Wang <ru...@google.com> that processing retractions requires the same
>> key-to-key ordering.
>>
>> I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>
>>
>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> The blog posts on stateful and timely computation with Beam should help
>>> clarify a lot about how to use state and timers to do this:
>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> You'll see there how there's an implicit per-single-element grouping for
>>> each key, so state and timers should support your use case very well.
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>>> while it is not guaranteed, in practice the sink will observe the trigger
>>>> firings in order (per key), since it'll be fused to the output of the GBK
>>>> operation (in all runners I know of).
>>>>
>>>> There have been a couple threads about trigger ordering as well on the
>>>> list recently that might have more information:
>>>>
>>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>>
>>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>>
>>>>> @Jan @Pablo Thank you
>>>>>
>>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>>> triggered per element. Keys are few (client accounts) so they can live
>>>>> forever.
>>>>>
>>>>> It looks like just by virtue of using a stateful ParDo I could get
>>>>> this final execution to be "serialized" per key. (Then I could simply do
>>>>> the compare-and-swap using Beam's state mechanism to keep track of the
>>>>> "latest trigger timestamp" instead of having to orchestrate
>>>>> compare-and-swap in the target store :thinking:.)
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> One addition, to make the list of options exhaustive, there is
>>>>>> probably
>>>>>> one more option
>>>>>>
>>>>>> c) create a ParDo keyed by primary key of your sink, cache the last
>>>>>> write in there and compare it locally, without the need to query the
>>>>>> database
>>>>>>
>>>>>> It would still need some timer to clear values after watermark +
>>>>>> allowed
>>>>>> lateness, because otherwise you would have to cache your whole
>>>>>> database
>>>>>> on workers. But because you don't need actual ordering, you just need
>>>>>> the most recent value (if I got it right) this might be an option.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>>> > Hi Aaron,
>>>>>> >
>>>>>> > maybe someone else will give another option, but if I understand
>>>>>> > correctly what you want to solve, then you essentially have to do
>>>>>> either:
>>>>>> >
>>>>>> > a) use the compare & swap mechanism in the sink you described
>>>>>> >
>>>>>> > b) use a buffer to buffer elements inside the outputting ParDo and
>>>>>> > only output them when watermark passes (using a timer).
>>>>>> >
>>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>>> > out-of-the-box solution for that.
>>>>>> >
>>>>>> > Jan
>>>>>> >
>>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>>> and
>>>>>> >> use a ParDo as a sink.
>>>>>> >>
>>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>>> see
>>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>>> >>
>>>>>> >> That said, my sink writes to a db or cache and I would not like
>>>>>> the
>>>>>> >> cache to ever regress its value to something "before" what it has
>>>>>> >> already written.
>>>>>> >>
>>>>>> >> Is the best way to solve this problem to always write the
>>>>>> event-time
>>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>>> the
>>>>>> >> triggered value in-hand is later than the target value?
>>>>>> >>
>>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>>> process
>>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>>> a
>>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>>> >>
>>>>>> >> Thanks for advice!
>>>>>>
>>>>>
Re: real real-time beam
Posted by Jan Lukavský <je...@seznam.cz>.
> Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in
terms of the explicit ordering on panes. And I do think we need to have
an explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner
could probably break a lot of things.
Thanks for this insight. I didn't know about the relation between
trigger firing (event) time - which is always non-decreasing - and the
resulting timestamp of output pane - which can be affected by timestamp
combiner and decrease in cases you describe. What actually correlates
with the pane index at all times is processing time of trigger firings
with the pane index. Would you say, that if the "annotation that would
guarantee ordering of panes" could be viewed as a time ordering
annotation with an additional time domain (event time, processing time)?
Could then these two be viewed as a single one with some distinguishing
parameter?
@RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
?
Event time should be probably made the default, because that is
information that is accessible with every WindowedValue, while pane
index is available only after GBK (or generally might be available after
every keyed sequential operation, but is missing after source for instance).
Jan
On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je.ik@seznam.cz
> <ma...@seznam.cz>> wrote:
>
> > I will not try to formalize this notion in this email. But I
> will note that since it is universally assured, it would be zero
> cost and significantly safer to formalize it and add an annotation
> noting it was required. It has nothing to do with event time
> ordering, only trigger firing ordering.
>
> I cannot agree with the last sentence (and I'm really not doing
> this on purpose :-)). Panes generally arrive out of order, as
> mentioned several times in the discussions linked from this
> thread. If we want to ensure "trigger firing ordering", we can use
> the pane index, that is correct. But - that is actually equivalent
> to sorting by event time, because pane index order will be
> (nearly) the same as event time order. This is due to the fact,
> that pane index and event time correlate (both are monotonic).
>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in
> terms of the explicit ordering on panes. And I do think we need to
> have an explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner
> could probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner
> because actually timestamp combiners only apply to the elements that
> particular pane. This is weird, and maybe a design bug, but good to
> know about.
>
> The pane index "only" solves the issue of preserving ordering even
> in case where there are multiple firings within the same timestamp
> (regardless of granularity). This was mentioned in the initial
> discussion about event time ordering, and is part of the design
> doc - users should be allowed to provide UDF for extracting
> time-correlated ordering field (which means ability to choose a
> preferred, or authoritative, observer which assigns unambiguous
> ordering to events). Example of this might include Kafka offsets
> as well, or any queue index for that matter. This is not yet
> implemented, but could (should) be in the future.
>
> The only case where these two things are (somewhat) different is
> the case mentioned by @Steve - if the output is stateless ParDo,
> which will get fused. But that is only because the processing is
> single-threaded per key, and therefore the ordering is implied by
> timer ordering (and careful here, many runners don't have this
> ordering 100% correct, as of now - this problem luckily appears
> only when there are multiple timers per key). Moreover, if there
> should be a failure, then the output might (would) get back in
> time anyway. If there would be a shuffle operation after
> GBK/Combine, then the ordering is no longer guaranteed and must be
> explicitly taken care of.
>
> Last note, I must agree with @Rui that all these discussions are
> very much related to retractions (precisely the ability to
> implement them).
>
> Jan
>
> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>> Hi Aaron,
>>
>> Another insightful observation.
>>
>> Whenever an aggregation (GBK / Combine per key) has a trigger
>> firing, there is a per-key sequence number attached. It is
>> included in metadata known as "PaneInfo" [1]. The value of
>> PaneInfo.getIndex() is colloquially referred to as the "pane
>> index". You can also make use of the "on time index" if you like.
>> The best way to access this metadata is to add a parameter of
>> type PaneInfo to your DoFn's @ProcessElement method. This works
>> for stateful or stateless DoFn.
>>
>> Most of Beam's IO connectors do not explicitly enforce that
>> outputs occur in pane index order but instead rely on the hope
>> that the runner delivers panes in order to the sink. IMO this is
>> dangerous but it has not yet caused a known issue. In practice,
>> each "input key to output key 'path' " through a pipeline's logic
>> does preserve order for all existing runners AFAIK and it is the
>> formalization that is missing. It is related to an observation by
>> +Rui Wang <ma...@google.com> that processing retractions
>> requires the same key-to-key ordering.
>>
>> I will not try to formalize this notion in this email. But I will
>> note that since it is universally assured, it would be zero cost
>> and significantly safer to formalize it and add an annotation
>> noting it was required. It has nothing to do with event time
>> ordering, only trigger firing ordering.
>>
>> Kenn
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>>
>>
>> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabloem@google.com
>> <ma...@google.com>> wrote:
>>
>> The blog posts on stateful and timely computation with Beam
>> should help clarify a lot about how to use state and timers
>> to do this:
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> You'll see there how there's an implicit per-single-element
>> grouping for each key, so state and timers should support
>> your use case very well.
>>
>> Best
>> -P.
>>
>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz
>> <sniemitz@apache.org <ma...@apache.org>> wrote:
>>
>> If you have a pipeline that looks like Input ->
>> GroupByKey -> ParDo, while it is not guaranteed, in
>> practice the sink will observe the trigger firings in
>> order (per key), since it'll be fused to the output of
>> the GBK operation (in all runners I know of).
>>
>> There have been a couple threads about trigger ordering
>> as well on the list recently that might have more
>> information:
>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon
>> <atdixon@gmail.com <ma...@gmail.com>> wrote:
>>
>> @Jan @Pablo Thank you
>>
>> @Pablo In this case it's a single global windowed
>> Combine/perKey, triggered per element. Keys are few
>> (client accounts) so they can live forever.
>>
>> It looks like just by virtue of using a stateful
>> ParDo I could get this final execution to be
>> "serialized" per key. (Then I could simply do the
>> compare-and-swap using Beam's state mechanism to keep
>> track of the "latest trigger timestamp" instead of
>> having to orchestrate compare-and-swap in the target
>> store :thinking:.)
>>
>>
>>
>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
>> <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>
>> One addition, to make the list of options
>> exhaustive, there is probably
>> one more option
>>
>> c) create a ParDo keyed by primary key of your
>> sink, cache the last
>> write in there and compare it locally, without
>> the need to query the
>> database
>>
>> It would still need some timer to clear values
>> after watermark + allowed
>> lateness, because otherwise you would have to
>> cache your whole database
>> on workers. But because you don't need actual
>> ordering, you just need
>> the most recent value (if I got it right) this
>> might be an option.
>>
>> Jan
>>
>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>> > Hi Aaron,
>> >
>> > maybe someone else will give another option,
>> but if I understand
>> > correctly what you want to solve, then you
>> essentially have to do either:
>> >
>> > a) use the compare & swap mechanism in the
>> sink you described
>> >
>> > b) use a buffer to buffer elements inside the
>> outputting ParDo and
>> > only output them when watermark passes (using a
>> timer).
>> >
>> > There is actually an ongoing discussion about
>> how to make option b)
>> > user-friendly and part of Beam itself, but
>> currently there is no
>> > out-of-the-box solution for that.
>> >
>> > Jan
>> >
>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> >> Suppose I trigger a Combine per-element (in a
>> high-volume stream) and
>> >> use a ParDo as a sink.
>> >>
>> >> I assume there is no guarantee about the order
>> that my ParDo will see
>> >> these triggers, especially as it processes in
>> parallel, anyway.
>> >>
>> >> That said, my sink writes to a db or cache and
>> I would not like the
>> >> cache to ever regress its value to something
>> "before" what it has
>> >> already written.
>> >>
>> >> Is the best way to solve this problem to
>> always write the event-time
>> >> in the cache and do a compare-and-swap only
>> updating the sink if the
>> >> triggered value in-hand is later than the
>> target value?
>> >>
>> >> Or is there a better way to guarantee that my
>> ParDo sink will process
>> >> elements in-order? (Eg, if I can give up
>> per-event/real-time, then a
>> >> delay-based trigger would probably be
>> sufficient I imagine.)
>> >>
>> >> Thanks for advice!
>>
Re: real real-time beam
Posted by Kenneth Knowles <ke...@apache.org>.
On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský <je...@seznam.cz> wrote:
> > I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> I cannot agree with the last sentence (and I'm really not doing this on
> purpose :-)). Panes generally arrive out of order, as mentioned several
> times in the discussions linked from this thread. If we want to ensure
> "trigger firing ordering", we can use the pane index, that is correct. But
> - that is actually equivalent to sorting by event time, because pane index
> order will be (nearly) the same as event time order. This is due to the
> fact, that pane index and event time correlate (both are monotonic).
>
Trigger firings can have decreasing event timestamps w/ the minimum
timestamp combiner*. I do think the issue at hand is best analyzed in terms
of the explicit ordering on panes. And I do think we need to have an
explicit guarantee or annotation strong enough to describe a
correct-under-all-allowed runners sink. Today an antagonistic runner could
probably break a lot of things.
Kenn
*In fact, they can decrease via the "maximum" timestamp combiner because
actually timestamp combiners only apply to the elements that particular
pane. This is weird, and maybe a design bug, but good to know about.
> The pane index "only" solves the issue of preserving ordering even in case
> where there are multiple firings within the same timestamp (regardless of
> granularity). This was mentioned in the initial discussion about event time
> ordering, and is part of the design doc - users should be allowed to
> provide UDF for extracting time-correlated ordering field (which means
> ability to choose a preferred, or authoritative, observer which assigns
> unambiguous ordering to events). Example of this might include Kafka
> offsets as well, or any queue index for that matter. This is not yet
> implemented, but could (should) be in the future.
>
> The only case where these two things are (somewhat) different is the case
> mentioned by @Steve - if the output is stateless ParDo, which will get
> fused. But that is only because the processing is single-threaded per key,
> and therefore the ordering is implied by timer ordering (and careful here,
> many runners don't have this ordering 100% correct, as of now - this
> problem luckily appears only when there are multiple timers per key).
> Moreover, if there should be a failure, then the output might (would) get
> back in time anyway. If there would be a shuffle operation after
> GBK/Combine, then the ordering is no longer guaranteed and must be
> explicitly taken care of.
>
> Last note, I must agree with @Rui that all these discussions are very much
> related to retractions (precisely the ability to implement them).
>
> Jan
> On 11/26/19 7:34 AM, Kenneth Knowles wrote:
>
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
> there is a per-key sequence number attached. It is included in metadata
> known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially
> referred to as the "pane index". You can also make use of the "on time
> index" if you like. The best way to access this metadata is to add a
> parameter of type PaneInfo to your DoFn's @ProcessElement method. This
> works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs occur
> in pane index order but instead rely on the hope that the runner delivers
> panes in order to the sink. IMO this is dangerous but it has not yet caused
> a known issue. In practice, each "input key to output key 'path' " through
> a pipeline's logic does preserve order for all existing runners AFAIK and
> it is the formalization that is missing. It is related to an observation by +Rui
> Wang <ru...@google.com> that processing retractions requires the same
> key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it was
> required. It has nothing to do with event time ordering, only trigger
> firing ordering.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:
>
>> The blog posts on stateful and timely computation with Beam should help
>> clarify a lot about how to use state and timers to do this:
>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> You'll see there how there's an implicit per-single-element grouping for
>> each key, so state and timers should support your use case very well.
>>
>> Best
>> -P.
>>
>> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>>> while it is not guaranteed, in practice the sink will observe the trigger
>>> firings in order (per key), since it'll be fused to the output of the GBK
>>> operation (in all runners I know of).
>>>
>>> There have been a couple threads about trigger ordering as well on the
>>> list recently that might have more information:
>>>
>>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>>
>>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>>
>>>
>>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>>
>>>> @Jan @Pablo Thank you
>>>>
>>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>>> triggered per element. Keys are few (client accounts) so they can live
>>>> forever.
>>>>
>>>> It looks like just by virtue of using a stateful ParDo I could get this
>>>> final execution to be "serialized" per key. (Then I could simply do the
>>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>>> target store :thinking:.)
>>>>
>>>>
>>>>
>>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> One addition, to make the list of options exhaustive, there is
>>>>> probably
>>>>> one more option
>>>>>
>>>>> c) create a ParDo keyed by primary key of your sink, cache the last
>>>>> write in there and compare it locally, without the need to query the
>>>>> database
>>>>>
>>>>> It would still need some timer to clear values after watermark +
>>>>> allowed
>>>>> lateness, because otherwise you would have to cache your whole
>>>>> database
>>>>> on workers. But because you don't need actual ordering, you just need
>>>>> the most recent value (if I got it right) this might be an option.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>>> > Hi Aaron,
>>>>> >
>>>>> > maybe someone else will give another option, but if I understand
>>>>> > correctly what you want to solve, then you essentially have to do
>>>>> either:
>>>>> >
>>>>> > a) use the compare & swap mechanism in the sink you described
>>>>> >
>>>>> > b) use a buffer to buffer elements inside the outputting ParDo and
>>>>> > only output them when watermark passes (using a timer).
>>>>> >
>>>>> > There is actually an ongoing discussion about how to make option b)
>>>>> > user-friendly and part of Beam itself, but currently there is no
>>>>> > out-of-the-box solution for that.
>>>>> >
>>>>> > Jan
>>>>> >
>>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>>> and
>>>>> >> use a ParDo as a sink.
>>>>> >>
>>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>>> see
>>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>>> >>
>>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>>> >> cache to ever regress its value to something "before" what it has
>>>>> >> already written.
>>>>> >>
>>>>> >> Is the best way to solve this problem to always write the
>>>>> event-time
>>>>> >> in the cache and do a compare-and-swap only updating the sink if
>>>>> the
>>>>> >> triggered value in-hand is later than the target value?
>>>>> >>
>>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>>> process
>>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then
>>>>> a
>>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>>> >>
>>>>> >> Thanks for advice!
>>>>>
>>>>
Re: real real-time beam
Posted by Jan Lukavský <je...@seznam.cz>.
> I will not try to formalize this notion in this email. But I will
note that since it is universally assured, it would be zero cost and
significantly safer to formalize it and add an annotation noting it was
required. It has nothing to do with event time ordering, only trigger
firing ordering.
I cannot agree with the last sentence (and I'm really not doing this on
purpose :-)). Panes generally arrive out of order, as mentioned several
times in the discussions linked from this thread. If we want to ensure
"trigger firing ordering", we can use the pane index, that is correct.
But - that is actually equivalent to sorting by event time, because pane
index order will be (nearly) the same as event time order. This is due
to the fact, that pane index and event time correlate (both are
monotonic). The pane index "only" solves the issue of preserving
ordering even in case where there are multiple firings within the same
timestamp (regardless of granularity). This was mentioned in the initial
discussion about event time ordering, and is part of the design doc -
users should be allowed to provide UDF for extracting time-correlated
ordering field (which means ability to choose a preferred, or
authoritative, observer which assigns unambiguous ordering to events).
Example of this might include Kafka offsets as well, or any queue index
for that matter. This is not yet implemented, but could (should) be in
the future.
The only case where these two things are (somewhat) different is the
case mentioned by @Steve - if the output is stateless ParDo, which will
get fused. But that is only because the processing is single-threaded
per key, and therefore the ordering is implied by timer ordering (and
careful here, many runners don't have this ordering 100% correct, as of
now - this problem luckily appears only when there are multiple timers
per key). Moreover, if there should be a failure, then the output might
(would) get back in time anyway. If there would be a shuffle operation
after GBK/Combine, then the ordering is no longer guaranteed and must be
explicitly taken care of.
Last note, I must agree with @Rui that all these discussions are very
much related to retractions (precisely the ability to implement them).
Jan
On 11/26/19 7:34 AM, Kenneth Knowles wrote:
> Hi Aaron,
>
> Another insightful observation.
>
> Whenever an aggregation (GBK / Combine per key) has a trigger firing,
> there is a per-key sequence number attached. It is included in
> metadata known as "PaneInfo" [1]. The value of PaneInfo.getIndex() is
> colloquially referred to as the "pane index". You can also make use of
> the "on time index" if you like. The best way to access this metadata
> is to add a parameter of type PaneInfo to your DoFn's @ProcessElement
> method. This works for stateful or stateless DoFn.
>
> Most of Beam's IO connectors do not explicitly enforce that outputs
> occur in pane index order but instead rely on the hope that the runner
> delivers panes in order to the sink. IMO this is dangerous but it has
> not yet caused a known issue. In practice, each "input key to output
> key 'path' " through a pipeline's logic does preserve order for all
> existing runners AFAIK and it is the formalization that is missing. It
> is related to an observation by +Rui Wang
> <ma...@google.com> that processing retractions requires the
> same key-to-key ordering.
>
> I will not try to formalize this notion in this email. But I will note
> that since it is universally assured, it would be zero cost and
> significantly safer to formalize it and add an annotation noting it
> was required. It has nothing to do with event time ordering, only
> trigger firing ordering.
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
>
>
> On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pabloem@google.com
> <ma...@google.com>> wrote:
>
> The blog posts on stateful and timely computation with Beam should
> help clarify a lot about how to use state and timers to do this:
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> You'll see there how there's an implicit per-single-element
> grouping for each key, so state and timers should support your use
> case very well.
>
> Best
> -P.
>
> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sniemitz@apache.org
> <ma...@apache.org>> wrote:
>
> If you have a pipeline that looks like Input -> GroupByKey ->
> ParDo, while it is not guaranteed, in practice the sink will
> observe the trigger firings in order (per key), since it'll be
> fused to the output of the GBK operation (in all runners I
> know of).
>
> There have been a couple threads about trigger ordering as
> well on the list recently that might have more information:
> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <atdixon@gmail.com
> <ma...@gmail.com>> wrote:
>
> @Jan @Pablo Thank you
>
> @Pablo In this case it's a single global windowed
> Combine/perKey, triggered per element. Keys are few
> (client accounts) so they can live forever.
>
> It looks like just by virtue of using a stateful ParDo I
> could get this final execution to be "serialized" per key.
> (Then I could simply do the compare-and-swap using Beam's
> state mechanism to keep track of the "latest trigger
> timestamp" instead of having to orchestrate
> compare-and-swap in the target store :thinking:.)
>
>
>
> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský
> <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>
> One addition, to make the list of options exhaustive,
> there is probably
> one more option
>
> c) create a ParDo keyed by primary key of your sink,
> cache the last
> write in there and compare it locally, without the
> need to query the
> database
>
> It would still need some timer to clear values after
> watermark + allowed
> lateness, because otherwise you would have to cache
> your whole database
> on workers. But because you don't need actual
> ordering, you just need
> the most recent value (if I got it right) this might
> be an option.
>
> Jan
>
> On 11/25/19 10:53 PM, Jan Lukavský wrote:
> > Hi Aaron,
> >
> > maybe someone else will give another option, but if
> I understand
> > correctly what you want to solve, then you
> essentially have to do either:
> >
> > a) use the compare & swap mechanism in the sink you
> described
> >
> > b) use a buffer to buffer elements inside the
> outputting ParDo and
> > only output them when watermark passes (using a timer).
> >
> > There is actually an ongoing discussion about how to
> make option b)
> > user-friendly and part of Beam itself, but currently
> there is no
> > out-of-the-box solution for that.
> >
> > Jan
> >
> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
> >> Suppose I trigger a Combine per-element (in a
> high-volume stream) and
> >> use a ParDo as a sink.
> >>
> >> I assume there is no guarantee about the order that
> my ParDo will see
> >> these triggers, especially as it processes in
> parallel, anyway.
> >>
> >> That said, my sink writes to a db or cache and I
> would not like the
> >> cache to ever regress its value to something
> "before" what it has
> >> already written.
> >>
> >> Is the best way to solve this problem to always
> write the event-time
> >> in the cache and do a compare-and-swap only
> updating the sink if the
> >> triggered value in-hand is later than the target value?
> >>
> >> Or is there a better way to guarantee that my ParDo
> sink will process
> >> elements in-order? (Eg, if I can give up
> per-event/real-time, then a
> >> delay-based trigger would probably be sufficient I
> imagine.)
> >>
> >> Thanks for advice!
>
Re: real real-time beam
Posted by Kenneth Knowles <ke...@apache.org>.
Hi Aaron,
Another insightful observation.
Whenever an aggregation (GBK / Combine per key) has a trigger firing, there
is a per-key sequence number attached. It is included in metadata known as
"PaneInfo" [1]. The value of PaneInfo.getIndex() is colloquially referred
to as the "pane index". You can also make use of the "on time index" if you
like. The best way to access this metadata is to add a parameter of type
PaneInfo to your DoFn's @ProcessElement method. This works for stateful or
stateless DoFn.
Most of Beam's IO connectors do not explicitly enforce that outputs occur
in pane index order but instead rely on the hope that the runner delivers
panes in order to the sink. IMO this is dangerous but it has not yet caused
a known issue. In practice, each "input key to output key 'path' " through
a pipeline's logic does preserve order for all existing runners AFAIK and
it is the formalization that is missing. It is related to an
observation by +Rui
Wang <ru...@google.com> that processing retractions requires the same
key-to-key ordering.
I will not try to formalize this notion in this email. But I will note that
since it is universally assured, it would be zero cost and significantly
safer to formalize it and add an annotation noting it was required. It has
nothing to do with event time ordering, only trigger firing ordering.
Kenn
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L557
On Mon, Nov 25, 2019 at 4:06 PM Pablo Estrada <pa...@google.com> wrote:
> The blog posts on stateful and timely computation with Beam should help
> clarify a lot about how to use state and timers to do this:
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> You'll see there how there's an implicit per-single-element grouping for
> each key, so state and timers should support your use case very well.
>
> Best
> -P.
>
> On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> If you have a pipeline that looks like Input -> GroupByKey -> ParDo,
>> while it is not guaranteed, in practice the sink will observe the trigger
>> firings in order (per key), since it'll be fused to the output of the GBK
>> operation (in all runners I know of).
>>
>> There have been a couple threads about trigger ordering as well on the
>> list recently that might have more information:
>>
>> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>>
>> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>>
>>
>> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> @Jan @Pablo Thank you
>>>
>>> @Pablo In this case it's a single global windowed Combine/perKey,
>>> triggered per element. Keys are few (client accounts) so they can live
>>> forever.
>>>
>>> It looks like just by virtue of using a stateful ParDo I could get this
>>> final execution to be "serialized" per key. (Then I could simply do the
>>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>>> target store :thinking:.)
>>>
>>>
>>>
>>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> One addition, to make the list of options exhaustive, there is probably
>>>> one more option
>>>>
>>>> c) create a ParDo keyed by primary key of your sink, cache the last
>>>> write in there and compare it locally, without the need to query the
>>>> database
>>>>
>>>> It would still need some timer to clear values after watermark +
>>>> allowed
>>>> lateness, because otherwise you would have to cache your whole database
>>>> on workers. But because you don't need actual ordering, you just need
>>>> the most recent value (if I got it right) this might be an option.
>>>>
>>>> Jan
>>>>
>>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>>> > Hi Aaron,
>>>> >
>>>> > maybe someone else will give another option, but if I understand
>>>> > correctly what you want to solve, then you essentially have to do
>>>> either:
>>>> >
>>>> > a) use the compare & swap mechanism in the sink you described
>>>> >
>>>> > b) use a buffer to buffer elements inside the outputting ParDo and
>>>> > only output them when watermark passes (using a timer).
>>>> >
>>>> > There is actually an ongoing discussion about how to make option b)
>>>> > user-friendly and part of Beam itself, but currently there is no
>>>> > out-of-the-box solution for that.
>>>> >
>>>> > Jan
>>>> >
>>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>>> >> Suppose I trigger a Combine per-element (in a high-volume stream)
>>>> and
>>>> >> use a ParDo as a sink.
>>>> >>
>>>> >> I assume there is no guarantee about the order that my ParDo will
>>>> see
>>>> >> these triggers, especially as it processes in parallel, anyway.
>>>> >>
>>>> >> That said, my sink writes to a db or cache and I would not like the
>>>> >> cache to ever regress its value to something "before" what it has
>>>> >> already written.
>>>> >>
>>>> >> Is the best way to solve this problem to always write the event-time
>>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>>> >> triggered value in-hand is later than the target value?
>>>> >>
>>>> >> Or is there a better way to guarantee that my ParDo sink will
>>>> process
>>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>>> >> delay-based trigger would probably be sufficient I imagine.)
>>>> >>
>>>> >> Thanks for advice!
>>>>
>>>
Re: real real-time beam
Posted by Pablo Estrada <pa...@google.com>.
The blog posts on stateful and timely computation with Beam should help
clarify a lot about how to use state and timers to do this:
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html
You'll see there how there's an implicit per-single-element grouping for
each key, so state and timers should support your use case very well.
Best
-P.
On Mon, Nov 25, 2019 at 3:47 PM Steve Niemitz <sn...@apache.org> wrote:
> If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
> it is not guaranteed, in practice the sink will observe the trigger firings
> in order (per key), since it'll be fused to the output of the GBK operation
> (in all runners I know of).
>
> There have been a couple threads about trigger ordering as well on the
> list recently that might have more information:
>
> https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
>
> https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
>
>
> On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> @Jan @Pablo Thank you
>>
>> @Pablo In this case it's a single global windowed Combine/perKey,
>> triggered per element. Keys are few (client accounts) so they can live
>> forever.
>>
>> It looks like just by virtue of using a stateful ParDo I could get this
>> final execution to be "serialized" per key. (Then I could simply do the
>> compare-and-swap using Beam's state mechanism to keep track of the "latest
>> trigger timestamp" instead of having to orchestrate compare-and-swap in the
>> target store :thinking:.)
>>
>>
>>
>> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> One addition, to make the list of options exhaustive, there is probably
>>> one more option
>>>
>>> c) create a ParDo keyed by primary key of your sink, cache the last
>>> write in there and compare it locally, without the need to query the
>>> database
>>>
>>> It would still need some timer to clear values after watermark + allowed
>>> lateness, because otherwise you would have to cache your whole database
>>> on workers. But because you don't need actual ordering, you just need
>>> the most recent value (if I got it right) this might be an option.
>>>
>>> Jan
>>>
>>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>>> > Hi Aaron,
>>> >
>>> > maybe someone else will give another option, but if I understand
>>> > correctly what you want to solve, then you essentially have to do
>>> either:
>>> >
>>> > a) use the compare & swap mechanism in the sink you described
>>> >
>>> > b) use a buffer to buffer elements inside the outputting ParDo and
>>> > only output them when watermark passes (using a timer).
>>> >
>>> > There is actually an ongoing discussion about how to make option b)
>>> > user-friendly and part of Beam itself, but currently there is no
>>> > out-of-the-box solution for that.
>>> >
>>> > Jan
>>> >
>>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>>> >> use a ParDo as a sink.
>>> >>
>>> >> I assume there is no guarantee about the order that my ParDo will see
>>> >> these triggers, especially as it processes in parallel, anyway.
>>> >>
>>> >> That said, my sink writes to a db or cache and I would not like the
>>> >> cache to ever regress its value to something "before" what it has
>>> >> already written.
>>> >>
>>> >> Is the best way to solve this problem to always write the event-time
>>> >> in the cache and do a compare-and-swap only updating the sink if the
>>> >> triggered value in-hand is later than the target value?
>>> >>
>>> >> Or is there a better way to guarantee that my ParDo sink will process
>>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>>> >> delay-based trigger would probably be sufficient I imagine.)
>>> >>
>>> >> Thanks for advice!
>>>
>>
Re: real real-time beam
Posted by Steve Niemitz <sn...@apache.org>.
If you have a pipeline that looks like Input -> GroupByKey -> ParDo, while
it is not guaranteed, in practice the sink will observe the trigger firings
in order (per key), since it'll be fused to the output of the GBK operation
(in all runners I know of).
There have been a couple threads about trigger ordering as well on the list
recently that might have more information:
https://lists.apache.org/thread.html/b61a908289a692dbd80dd6a869759eacd45b308cb3873bfb77c4def6@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/20d11046d26174969ef44a781e409a1cb9f7c736e605fa40fdf98397@%3Cuser.beam.apache.org%3E
On Mon, Nov 25, 2019 at 5:52 PM Aaron Dixon <at...@gmail.com> wrote:
> @Jan @Pablo Thank you
>
> @Pablo In this case it's a single global windowed Combine/perKey,
> triggered per element. Keys are few (client accounts) so they can live
> forever.
>
> It looks like just by virtue of using a stateful ParDo I could get this
> final execution to be "serialized" per key. (Then I could simply do the
> compare-and-swap using Beam's state mechanism to keep track of the "latest
> trigger timestamp" instead of having to orchestrate compare-and-swap in the
> target store :thinking:.)
>
>
>
> On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> One addition, to make the list of options exhaustive, there is probably
>> one more option
>>
>> c) create a ParDo keyed by primary key of your sink, cache the last
>> write in there and compare it locally, without the need to query the
>> database
>>
>> It would still need some timer to clear values after watermark + allowed
>> lateness, because otherwise you would have to cache your whole database
>> on workers. But because you don't need actual ordering, you just need
>> the most recent value (if I got it right) this might be an option.
>>
>> Jan
>>
>> On 11/25/19 10:53 PM, Jan Lukavský wrote:
>> > Hi Aaron,
>> >
>> > maybe someone else will give another option, but if I understand
>> > correctly what you want to solve, then you essentially have to do
>> either:
>> >
>> > a) use the compare & swap mechanism in the sink you described
>> >
>> > b) use a buffer to buffer elements inside the outputting ParDo and
>> > only output them when watermark passes (using a timer).
>> >
>> > There is actually an ongoing discussion about how to make option b)
>> > user-friendly and part of Beam itself, but currently there is no
>> > out-of-the-box solution for that.
>> >
>> > Jan
>> >
>> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
>> >> use a ParDo as a sink.
>> >>
>> >> I assume there is no guarantee about the order that my ParDo will see
>> >> these triggers, especially as it processes in parallel, anyway.
>> >>
>> >> That said, my sink writes to a db or cache and I would not like the
>> >> cache to ever regress its value to something "before" what it has
>> >> already written.
>> >>
>> >> Is the best way to solve this problem to always write the event-time
>> >> in the cache and do a compare-and-swap only updating the sink if the
>> >> triggered value in-hand is later than the target value?
>> >>
>> >> Or is there a better way to guarantee that my ParDo sink will process
>> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
>> >> delay-based trigger would probably be sufficient I imagine.)
>> >>
>> >> Thanks for advice!
>>
>
Re: real real-time beam
Posted by Aaron Dixon <at...@gmail.com>.
@Jan @Pablo Thank you
@Pablo In this case it's a single global windowed Combine/perKey, triggered
per element. Keys are few (client accounts) so they can live forever.
It looks like just by virtue of using a stateful ParDo I could get this
final execution to be "serialized" per key. (Then I could simply do the
compare-and-swap using Beam's state mechanism to keep track of the "latest
trigger timestamp" instead of having to orchestrate compare-and-swap in the
target store :thinking:.)
On Mon, Nov 25, 2019 at 4:14 PM Jan Lukavský <je...@seznam.cz> wrote:
> One addition, to make the list of options exhaustive, there is probably
> one more option
>
> c) create a ParDo keyed by primary key of your sink, cache the last
> write in there and compare it locally, without the need to query the
> database
>
> It would still need some timer to clear values after watermark + allowed
> lateness, because otherwise you would have to cache your whole database
> on workers. But because you don't need actual ordering, you just need
> the most recent value (if I got it right) this might be an option.
>
> Jan
>
> On 11/25/19 10:53 PM, Jan Lukavský wrote:
> > Hi Aaron,
> >
> > maybe someone else will give another option, but if I understand
> > correctly what you want to solve, then you essentially have to do either:
> >
> > a) use the compare & swap mechanism in the sink you described
> >
> > b) use a buffer to buffer elements inside the outputting ParDo and
> > only output them when watermark passes (using a timer).
> >
> > There is actually an ongoing discussion about how to make option b)
> > user-friendly and part of Beam itself, but currently there is no
> > out-of-the-box solution for that.
> >
> > Jan
> >
> > On 11/25/19 10:27 PM, Aaron Dixon wrote:
> >> Suppose I trigger a Combine per-element (in a high-volume stream) and
> >> use a ParDo as a sink.
> >>
> >> I assume there is no guarantee about the order that my ParDo will see
> >> these triggers, especially as it processes in parallel, anyway.
> >>
> >> That said, my sink writes to a db or cache and I would not like the
> >> cache to ever regress its value to something "before" what it has
> >> already written.
> >>
> >> Is the best way to solve this problem to always write the event-time
> >> in the cache and do a compare-and-swap only updating the sink if the
> >> triggered value in-hand is later than the target value?
> >>
> >> Or is there a better way to guarantee that my ParDo sink will process
> >> elements in-order? (Eg, if I can give up per-event/real-time, then a
> >> delay-based trigger would probably be sufficient I imagine.)
> >>
> >> Thanks for advice!
>
Re: real real-time beam
Posted by Jan Lukavský <je...@seznam.cz>.
One addition, to make the list of options exhaustive, there is probably
one more option
c) create a ParDo keyed by primary key of your sink, cache the last
write in there and compare it locally, without the need to query the
database
It would still need some timer to clear values after watermark + allowed
lateness, because otherwise you would have to cache your whole database
on workers. But because you don't need actual ordering, you just need
the most recent value (if I got it right) this might be an option.
Jan
On 11/25/19 10:53 PM, Jan Lukavský wrote:
> Hi Aaron,
>
> maybe someone else will give another option, but if I understand
> correctly what you want to solve, then you essentially have to do either:
>
> a) use the compare & swap mechanism in the sink you described
>
> b) use a buffer to buffer elements inside the outputting ParDo and
> only output them when watermark passes (using a timer).
>
> There is actually an ongoing discussion about how to make option b)
> user-friendly and part of Beam itself, but currently there is no
> out-of-the-box solution for that.
>
> Jan
>
> On 11/25/19 10:27 PM, Aaron Dixon wrote:
>> Suppose I trigger a Combine per-element (in a high-volume stream) and
>> use a ParDo as a sink.
>>
>> I assume there is no guarantee about the order that my ParDo will see
>> these triggers, especially as it processes in parallel, anyway.
>>
>> That said, my sink writes to a db or cache and I would not like the
>> cache to ever regress its value to something "before" what it has
>> already written.
>>
>> Is the best way to solve this problem to always write the event-time
>> in the cache and do a compare-and-swap only updating the sink if the
>> triggered value in-hand is later than the target value?
>>
>> Or is there a better way to guarantee that my ParDo sink will process
>> elements in-order? (Eg, if I can give up per-event/real-time, then a
>> delay-based trigger would probably be sufficient I imagine.)
>>
>> Thanks for advice!
Re: real real-time beam
Posted by Jan Lukavský <je...@seznam.cz>.
Hi Aaron,
maybe someone else will give another option, but if I understand
correctly what you want to solve, then you essentially have to do either:
a) use the compare & swap mechanism in the sink you described
b) use a buffer to buffer elements inside the outputting ParDo and
only output them when watermark passes (using a timer).
There is actually an ongoing discussion about how to make option b)
user-friendly and part of Beam itself, but currently there is no
out-of-the-box solution for that.
Jan
On 11/25/19 10:27 PM, Aaron Dixon wrote:
> Suppose I trigger a Combine per-element (in a high-volume stream) and
> use a ParDo as a sink.
>
> I assume there is no guarantee about the order that my ParDo will see
> these triggers, especially as it processes in parallel, anyway.
>
> That said, my sink writes to a db or cache and I would not like the
> cache to ever regress its value to something "before" what it has
> already written.
>
> Is the best way to solve this problem to always write the event-time
> in the cache and do a compare-and-swap only updating the sink if the
> triggered value in-hand is later than the target value?
>
> Or is there a better way to guarantee that my ParDo sink will process
> elements in-order? (Eg, if I can give up per-event/real-time, then a
> delay-based trigger would probably be sufficient I imagine.)
>
> Thanks for advice!