You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Catlyn Kong <ca...@yelp.com> on 2020/06/11 01:34:16 UTC

Re: [External] Re: Ensuring messages are processed and emitted in-order

Thank y’all for the input!


About the RequiresTimeSortedInput, we were thinking of the following 2
potential approaches:

   1.

   Assign kafka offset as the timestamp while doing a GroupByKey on
   partition_id in a GlobalWindow
   2.

   Rely on the fact that Flink consumes from kafka partitions in offset
   order and assign ingestion time as the timestamp. (We're using our own
   non-KafkaIO based Kafka consumer extended from FlinkKafkaConsumer011 and
   thus have direct control over timestamp and watermark assignment)

We find it non-trivial to reason about watermark assignment especially when
taking into consideration that:

   1.

   there might be restarts at any given time and
   2.

   advancing watermark in one kafka partition might result in:
   1.

      dropping elements from other kafka partitions (if we’re not following
      native flink approach where we take the lowest watermark when merging
      streams) or
      2.

      delay output from other kafka partitions since they’ll be buffered.

Is there any recommendation on how this should be handled?

In the direction of using a StatefulDoFn to buffer and reorder, we’re
concerned about performance since we need to serialize and deserialize the
entire BagState (with all the messages) everytime we process a message. And
potentially insert this StatefulDoFn in multiple places in the pipeline. Is
there any benchmark result of a pipeline that does something similar for us
to reference?

The proposal for a sorted state API sounds promising, is there a ticket/doc
that we can follow?


On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax <re...@google.com> wrote:

> I don't know how well RequiresTimeSortedInput will work for any late data.
>
> I think you will want to include the Kafka offset in your records (unless
> the records have their own sequence number) and then use state to buffer
> and sort. There is a proposal (and work in progress) for a sorted state
> API, which will make this easier and more efficient.
>
> Reuven
>
> On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com> wrote:
>
>> For runners that support @RequiresTimeSortedInput, all your input will
>> come time sorted (as long as your element's timestamp tracks the order that
>> you want).
>> For runners that don't support this, you need to build a StatefulDoFn
>> that buffers out of order events and reorders them to the order that you
>> need.
>>
>> @Pablo Estrada <pa...@google.com> Any other suggestions for supporting
>> CDC type pipelines?
>>
>> On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <ca...@yelp.com> wrote:
>>
>>> Thanks a lot for the response!
>>>
>>> We have several business use cases that rely strongly on ordering by
>>> Kafka offset:
>>> 1) streaming unwindowed inner join: say we want to join users with
>>> reviews on user_id. Here are the schemas for two streams:
>>>     user:
>>>
>>>    - user_id
>>>    - name
>>>    - timestamp
>>>
>>>     reviews:
>>>
>>>    - review_id
>>>    - user_id
>>>    - timestamp
>>>
>>> Here are the messages in each stream ordered by kafka offset:
>>>     user:
>>>     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>>>     reviews:
>>>     (ABC, 1, 90), (DEF, 2, 360)
>>> I would expect to receive following output messages:
>>>     (1, name_a, ABC) at timestamp 90
>>>     (1, name_c, ABC) at timestamp 240
>>>     (2, name_b, DEF) at timestamp 360
>>> This can be done in native Flink since Flink kafka consumer reads from
>>> each partition sequentially. But without an ordering guarantee, we can end
>>> up with arbitrary results. So how would we implement this in Beam?
>>> 2) unwindowed aggregation: aggregate all the employees for every
>>> organization. Say we have a new employee stream with the following schema:
>>>     new_employee:
>>>
>>>    - organization_id
>>>    - employee_name
>>>
>>> And here are messaged ordered by kafka offset:
>>> (1, name_a), (2, name_b), (2, name_c), (1, name_d)
>>> I would expect the output to be:
>>> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a,
>>> name_d])
>>> Again without an ordering guarantee, the result is non deterministic.
>>>
>>> Change data capture (CDC) streams are a very common use case for our
>>> data pipeline. As in the examples above we rely on Kafka offsets to make
>>> sure we process data mutations in the proper order. While in some cases we
>>> have Flink native solutions to these problems (Flink provides ordering
>>> guarantees within the chosen key), we are now building some new Beam
>>> applications that would require ordering guarantees. What is the
>>> recommended approach in Beam for such use cases? If this isn’t currently
>>> supported, do we have any near plan to add native ordering support in Beam?
>>>
>>>
>>> On 2020/06/09 20:37:22, Luke Cwik <l....@google.com> wrote:
>>> > This will likely break due to:>
>>> > * workers can have more then one thread and hence process the source
>>> in>
>>> > parallel>
>>> > * splitting a source allows for the source to be broken up into
>>> multiple>
>>> > restrictions and hence the runner can process those restrictions in
>>> any>
>>> > order they want. (lets say your kafka partition has unconsumed commit>
>>> > offset range [20, 100), this could be split into [20, 60), [60, 100)
>>> and>
>>> > the [60, 100) offset range could be processed first)>
>>> >
>>> > You're right that you need to sort the output however you want within
>>> your>
>>> > DoFn before you make external calls to Kafka (this prevents you from
>>> using>
>>> > the KafkaIO sink implementation as a transform). There is an
>>> annotation>
>>> > @RequiresTimeSortedInput which is a special case for this sorting if
>>> you>
>>> > want it to be sorted by the elements timestamp but still you'll need
>>> to>
>>> > write to Kafka directly yourself from your DoFn.>
>>> >
>>> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:>
>>> >
>>> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our>
>>> > > messages originate from a custom source that consumes messages from
>>> a>
>>> > > Kafka topic and emits them in the order of their Kafka offsets to a>
>>> > > DoFn. After this DoFn processes the messages, they are emitted to a>
>>> > > custom sink that sends messages to a Kafka topic.>
>>> > >>
>>> > > We want to process those messages in the order in which we receive>
>>> > > them from Kafka and then emit them to the Kafka sink in the same>
>>> > > order, but based on our understanding Beam does not provide an>
>>> > > in-order transport. However, in practice we noticed that with a
>>> Python>
>>> > > SDK worker on Flink and a parallelism setting of 1 and one
>>> sdk_worker>
>>> > > instance, messages seem to be both processed and emitted in order.
>>> Is>
>>> > > that implementation-specific in-order behavior something that we
>>> can>
>>> > > rely on, or is it very likely that this will break at some future>
>>> > > point?>
>>> > >>
>>> > > In case it's not recommended to depend on that behavior what is the>
>>> > > best approach for in-order processing?>
>>> > >>
>>> > >
>>> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>>>
>>> > > recommends to order events in a heap, but according to our>
>>> > > understanding this approach will only work when directly writing to
>>> an>
>>> > > external system.>
>>> > >>
>>> >
>>>
>>

Re: [External] Re: Ensuring messages are processed and emitted in-order

Posted by Catlyn Kong <ca...@yelp.com>.
Thanks for all the ideas! We looked around and found out that
@RequiresTimeSortedInput
is not supported on portable Flink runner for streaming pipelines
<https://github.com/apache/beam/blob/cd67dbf1c0209824a06f764f3a1d4c591441c5c8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPortableRunnerUtils.java#L46>
due to BEAM-8460 <https://issues.apache.org/jira/browse/BEAM-8460>. Here’re
the short term and long term plans we’ve come up with and I'm hoping I can
get some insights from you guys:

1) short term: Use BagState with Timer similar to the pseudo code in the
@RequiresTimeSortedInput design doc but sort by offset instead. Although
this’ll only work for pipelines that write to external systems directly.
Just wanna confirm that BEAM-8460 won’t be a blocker in this case?

2) long term: patch sdk_worker code in our Beam fork to avoid the things
that are causing out-of-orderliness right now. @Luke Cwik pointed out the
reason why using one sdk_worker instance won’t guarantee in-order
processing due to:
* workers can have more then one thread and hence process the source in

parallel

* splitting a source allows for the source to be broken up into multiple

restrictions and hence the runner can process those restrictions in any

order they want.
For bullet point one, we’re planning on switching back to a bounded
ThreadPoolExecutor of size 1, basically reverting this change:
https://github.com/apache/beam/pull/9477/
<https://github.com/apache/beam/pull/9477/files>. To limit the performance
degradation caused by this, we’re relying on the assumption that after a
GroupByKey(), all messages of a certain key will go to the same sdk_worker
(please correct me if this is wrong). This gives us a chance to spin up as
many sdk_worker processes as the number of keys.

For bullet point two, we’re using our own non-KafkaIO based Kafka consumer
that’s not splittable by design and reads each partition in offset order,
so this won’t be a concern for us.

The main question for the long term solution is whether there are other
things that’s currently causing out-of-orderliness that besides the above
mentioned two?


On Thu, Jun 11, 2020 at 9:10 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I'm afraid @RequiresTimeSortedInput currently does not fit this
> requirement, because it works on event _timestamps_ only. Assigning Kafka
> offsets as event timestamps is probably not a good idea. In the original
> proposal [1] there is mention that it would be good to implement sorting by
> some other field (which would be required to correlate with timestamps, but
> that should be the case for Kafka offsets). Unfortunately, that is not yet
> implemented. If you run your pipeline in streaming mode, it should be
> possible to implement this yourself using BagState and Timer that will fire
> periodically, gather all elements with timestamp less than 'timerTimestamp
> - allowed lateness', sort them by Kafka offset and process. I wouldn't
> recommend relying on Kafka offsets wouldn't have gaps, because they can (at
> least for compacted topics, but very likely in other cases as well).
>
> Jan
>
> [1]
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
> On 6/11/20 5:30 PM, Reuven Lax wrote:
>
> I would not recommend using RequiresTimeSortedInput in this way. I also
> would not use ingestion time, as in a distributed environment, time skew
> between workers might mess up the order.
>
> I will ping the discussion on the sorted state API and add you. My hope is
> that this can be implemented efficiently soon, though efficient
> implementation will definitely be runner dependent. If you are using Flink,
> we'll have to figure how to implement this efficiently using Flink.
> Dataflow is planning on providing native support for sorted state. I don't
> know if Flink has native support for this, so it might have to be emulated
> using its existing state primitives.
>
> In the meanwhile, I would suggest using bagstate along with timers. The
> timer can periodically pull sorted messages out of the bag (you can use
> watermark timers here) and write back the messages that still have gaps in
> them.
>
> Reuven
>
> On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong <ca...@yelp.com> wrote:
>
>> Thank y’all for the input!
>>
>>
>> About the RequiresTimeSortedInput, we were thinking of the following 2
>> potential approaches:
>>
>>    1.
>>
>>    Assign kafka offset as the timestamp while doing a GroupByKey on
>>    partition_id in a GlobalWindow
>>    2.
>>
>>    Rely on the fact that Flink consumes from kafka partitions in offset
>>    order and assign ingestion time as the timestamp. (We're using our
>>    own non-KafkaIO based Kafka consumer extended from FlinkKafkaConsumer011
>>    and thus have direct control over timestamp and watermark assignment)
>>
>> We find it non-trivial to reason about watermark assignment especially
>> when taking into consideration that:
>>
>>    1.
>>
>>    there might be restarts at any given time and
>>    2.
>>
>>    advancing watermark in one kafka partition might result in:
>>    1.
>>
>>       dropping elements from other kafka partitions (if we’re not
>>       following native flink approach where we take the lowest watermark when
>>       merging streams) or
>>       2.
>>
>>       delay output from other kafka partitions since they’ll be
>>       buffered.
>>
>> Is there any recommendation on how this should be handled? In the
>> direction of using a StatefulDoFn to buffer and reorder, we’re concerned
>> about performance since we need to serialize and deserialize the entire
>> BagState (with all the messages) everytime we process a message. And
>> potentially insert this StatefulDoFn in multiple places in the pipeline. Is
>> there any benchmark result of a pipeline that does something similar for us
>> to reference? The proposal for a sorted state API sounds promising, is
>> there a ticket/doc that we can follow?
>>
>>
>> On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I don't know how well RequiresTimeSortedInput will work for any late
>>> data.
>>>
>>> I think you will want to include the Kafka offset in your records
>>> (unless the records have their own sequence number) and then use state to
>>> buffer and sort. There is a proposal (and work in progress) for a sorted
>>> state API, which will make this easier and more efficient.
>>>
>>> Reuven
>>>
>>> On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> For runners that support @RequiresTimeSortedInput, all your input will
>>>> come time sorted (as long as your element's timestamp tracks the order that
>>>> you want).
>>>> For runners that don't support this, you need to build a StatefulDoFn
>>>> that buffers out of order events and reorders them to the order that you
>>>> need.
>>>>
>>>> @Pablo Estrada <pa...@google.com> Any other suggestions for
>>>> supporting CDC type pipelines?
>>>>
>>>> On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <ca...@yelp.com> wrote:
>>>>
>>>>> Thanks a lot for the response!
>>>>>
>>>>> We have several business use cases that rely strongly on ordering by
>>>>> Kafka offset: 1) streaming unwindowed inner join: say we want to join
>>>>> users with reviews on user_id. Here are the schemas for two streams:
>>>>>   user:
>>>>>
>>>>>    - user_id
>>>>>    - name
>>>>>    - timestamp
>>>>>
>>>>>     reviews:
>>>>>
>>>>>    - review_id
>>>>>    - user_id
>>>>>    - timestamp
>>>>>
>>>>> Here are the messages in each stream ordered by kafka offset:
>>>>> user:
>>>>>     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>>>>>     reviews:
>>>>>     (ABC, 1, 90), (DEF, 2, 360)
>>>>> I would expect to receive following output messages:
>>>>>     (1, name_a, ABC) at timestamp 90     (1, name_c, ABC) at
>>>>> timestamp 240
>>>>>     (2, name_b, DEF) at timestamp 360 This can be done in native
>>>>> Flink since Flink kafka consumer reads from each partition sequentially.
>>>>> But without an ordering guarantee, we can end up with arbitrary results. So
>>>>> how would we implement this in Beam? 2) unwindowed aggregation:
>>>>> aggregate all the employees for every organization. Say we have a new
>>>>> employee stream with the following schema:
>>>>>     new_employee:
>>>>>
>>>>>    - organization_id
>>>>>    - employee_name
>>>>>
>>>>> And here are messaged ordered by kafka offset:
>>>>> (1, name_a), (2, name_b), (2, name_c), (1, name_d)
>>>>> I would expect the output to be:
>>>>> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a,
>>>>> name_d])
>>>>> Again without an ordering guarantee, the result is non deterministic. Change
>>>>> data capture (CDC) streams are a very common use case for our data
>>>>> pipeline. As in the examples above we rely on Kafka offsets to make sure we
>>>>> process data mutations in the proper order. While in some cases we have
>>>>> Flink native solutions to these problems (Flink provides ordering
>>>>> guarantees within the chosen key), we are now building some new Beam
>>>>> applications that would require ordering guarantees. What is the
>>>>> recommended approach in Beam for such use cases? If this isn’t currently
>>>>> supported, do we have any near plan to add native ordering support in Beam?
>>>>>
>>>>> On 2020/06/09 20:37:22, Luke Cwik <l....@google.com> wrote:
>>>>> > This will likely break due to:>
>>>>> > * workers can have more then one thread and hence process the source
>>>>> in>
>>>>> > parallel>
>>>>> > * splitting a source allows for the source to be broken up into
>>>>> multiple>
>>>>> > restrictions and hence the runner can process those restrictions in
>>>>> any>
>>>>> > order they want. (lets say your kafka partition has unconsumed
>>>>> commit>
>>>>> > offset range [20, 100), this could be split into [20, 60), [60, 100)
>>>>> and>
>>>>> > the [60, 100) offset range could be processed first)>
>>>>> >
>>>>> > You're right that you need to sort the output however you want
>>>>> within your>
>>>>> > DoFn before you make external calls to Kafka (this prevents you from
>>>>> using>
>>>>> > the KafkaIO sink implementation as a transform). There is an
>>>>> annotation>
>>>>> > @RequiresTimeSortedInput which is a special case for this sorting if
>>>>> you>
>>>>> > want it to be sorted by the elements timestamp but still you'll need
>>>>> to>
>>>>> > write to Kafka directly yourself from your DoFn.>
>>>>> >
>>>>> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:>
>>>>> >
>>>>> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our>
>>>>> > > messages originate from a custom source that consumes messages
>>>>> from a>
>>>>> > > Kafka topic and emits them in the order of their Kafka offsets to
>>>>> a>
>>>>> > > DoFn. After this DoFn processes the messages, they are emitted to
>>>>> a>
>>>>> > > custom sink that sends messages to a Kafka topic.>
>>>>> > >>
>>>>> > > We want to process those messages in the order in which we
>>>>> receive>
>>>>> > > them from Kafka and then emit them to the Kafka sink in the same>
>>>>> > > order, but based on our understanding Beam does not provide an>
>>>>> > > in-order transport. However, in practice we noticed that with a
>>>>> Python>
>>>>> > > SDK worker on Flink and a parallelism setting of 1 and one
>>>>> sdk_worker>
>>>>> > > instance, messages seem to be both processed and emitted in order.
>>>>> Is>
>>>>> > > that implementation-specific in-order behavior something that we
>>>>> can>
>>>>> > > rely on, or is it very likely that this will break at some future>
>>>>> > > point?>
>>>>> > >>
>>>>> > > In case it's not recommended to depend on that behavior what is
>>>>> the>
>>>>> > > best approach for in-order processing?>
>>>>> > >>
>>>>> > >
>>>>> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>>>>>
>>>>> > > recommends to order events in a heap, but according to our>
>>>>> > > understanding this approach will only work when directly writing
>>>>> to an>
>>>>> > > external system.>
>>>>> > >>
>>>>> >
>>>>>
>>>>

Re: [External] Re: Ensuring messages are processed and emitted in-order

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

I'm afraid @RequiresTimeSortedInput currently does not fit this 
requirement, because it works on event _timestamps_ only. Assigning 
Kafka offsets as event timestamps is probably not a good idea. In the 
original proposal [1] there is mention that it would be good to 
implement sorting by some other field (which would be required to 
correlate with timestamps, but that should be the case for Kafka 
offsets). Unfortunately, that is not yet implemented. If you run your 
pipeline in streaming mode, it should be possible to implement this 
yourself using BagState and Timer that will fire periodically, gather 
all elements with timestamp less than 'timerTimestamp - allowed 
lateness', sort them by Kafka offset and process. I wouldn't recommend 
relying on Kafka offsets wouldn't have gaps, because they can (at least 
for compacted topics, but very likely in other cases as well).

Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing

On 6/11/20 5:30 PM, Reuven Lax wrote:
> I would not recommend using RequiresTimeSortedInput in this way. I 
> also would not use ingestion time, as in a distributed environment, 
> time skew between workers might mess up the order.
>
> I will ping the discussion on the sorted state API and add you. My 
> hope is that this can be implemented efficiently soon, though 
> efficient implementation will definitely be runner dependent. If you 
> are using Flink, we'll have to figure how to implement this 
> efficiently using Flink. Dataflow is planning on providing native 
> support for sorted state. I don't know if Flink has native support for 
> this, so it might have to be emulated using its existing state primitives.
>
> In the meanwhile, I would suggest using bagstate along with timers. 
> The timer can periodically pull sorted messages out of the bag (you 
> can use watermark timers here) and write back the messages that still 
> have gaps in them.
>
> Reuven
>
> On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong <catlynk@yelp.com 
> <ma...@yelp.com>> wrote:
>
>     Thank y’all for the input!
>
>
>
>     About the RequiresTimeSortedInput, we were thinking of the
>     following 2 potential approaches:
>
>     1.
>
>         Assign kafka offset as the timestamp while doing a GroupByKey
>         on partition_id in a GlobalWindow
>
>     2.
>
>         Rely on the fact that Flink consumes from kafka partitions in
>         offset order and assign ingestion time as the timestamp.
>         (We're using our own non-KafkaIO based Kafka consumer extended
>         from FlinkKafkaConsumer011 and thus have direct control over
>         timestamp and watermark assignment)
>
>     We find it non-trivial to reason about watermark assignment
>     especially when taking into consideration that:
>
>     1.
>
>         there might be restarts at any given time and
>
>     2.
>
>         advancing watermark in one kafka partition might result in:
>
>         1.
>
>             dropping elements from other kafka partitions (if we’re
>             not following native flink approach where we take the
>             lowest watermark when merging streams) or
>
>         2.
>
>             delay output from other kafka partitions since they’ll be
>             buffered.
>
>     Is there any recommendation on how this should be handled? In the
>     direction of using a StatefulDoFn to buffer and reorder, we’re
>     concerned about performance since we need to serialize and
>     deserialize the entire BagState (with all the messages) everytime
>     we process a message. And potentially insert this StatefulDoFn in
>     multiple places in the pipeline. Is there any benchmark result of
>     a pipeline that does something similar for us to reference?The
>     proposal for a sorted state API sounds promising, is there a
>     ticket/doc that we can follow?
>
>
>
>     On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax <relax@google.com
>     <ma...@google.com>> wrote:
>
>         I don't know how well RequiresTimeSortedInput will work for
>         any late data.
>
>         I think you will want to include the Kafka offset in your
>         records (unless the records have their own sequence number)
>         and then use state to buffer and sort. There is a proposal
>         (and work in progress) for a sorted state API, which will make
>         this easier and more efficient.
>
>         Reuven
>
>         On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lcwik@google.com
>         <ma...@google.com>> wrote:
>
>             For runners that support @RequiresTimeSortedInput, all
>             your input will come time sorted (as long as your
>             element's timestamp tracks the order that you want).
>             For runners that don't support this, you need to build a
>             StatefulDoFn that buffers out of order events and reorders
>             them to the order that you need.
>
>             @Pablo Estrada <ma...@google.com> Any other
>             suggestions for supporting CDC type pipelines?
>
>             On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong
>             <catlynk@yelp.com <ma...@yelp.com>> wrote:
>
>                 Thanks a lot for the response!
>
>                 We have several business use cases that rely strongly
>                 on ordering by Kafka offset:1) streaming unwindowed
>                 inner join: say we want to join users with reviews on
>                 user_id. Here are the schemas for two streams:    user:
>
>                  *
>                     user_id
>                  *
>                     name
>                  *
>                     timestamp
>
>                     reviews:
>
>                  *
>                     review_id
>                  *
>                     user_id
>                  *
>                     timestamp
>
>                 Here are the messages in each stream ordered by kafka
>                 offset:    user:
>                     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>                     reviews:
>                     (ABC, 1, 90), (DEF, 2, 360)
>                 I would expect to receive following output messages:
>                     (1, name_a, ABC) at timestamp 90    (1, name_c,
>                 ABC) at timestamp 240
>                     (2, name_b, DEF) at timestamp 360This can be done
>                 in native Flink since Flink kafka consumer reads from
>                 each partition sequentially. But without an ordering
>                 guarantee, we can end up with arbitrary results. So
>                 how would we implement this in Beam?2) unwindowed
>                 aggregation: aggregate all the employees for every
>                 organization. Say we have a new employee stream with
>                 the following schema:
>                     new_employee:
>
>                   * organization_id
>                  *
>                     employee_name
>
>                 And here are messaged ordered by kafka offset:
>                 (1, name_a), (2, name_b), (2, name_c), (1, name_d)
>                 I would expect the output to be:
>                 (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]),
>                 (1, [name_a, name_d])
>                 Again without an ordering guarantee, the result is non
>                 deterministic. Change data capture (CDC) streams are a
>                 very common use case for our data pipeline. As in the
>                 examples above we rely on Kafka offsets to make sure
>                 we process data mutations in the proper order. While
>                 in some cases we have Flink native solutions to these
>                 problems (Flink provides ordering guarantees within
>                 the chosen key), we are now building some new Beam
>                 applications that would require ordering guarantees.
>                 What is the recommended approach in Beam for such use
>                 cases? If this isn’t currently supported, do we have
>                 any near plan to add native ordering support in Beam?
>
>                 On 2020/06/09 20:37:22, Luke Cwik <l...@google.com
>                 <http://google.com>> wrote:
>                 > This will likely break due to:>
>                 > * workers can have more then one thread and hence
>                 process the source in>
>                 > parallel>
>                 > * splitting a source allows for the source to be
>                 broken up into multiple>
>                 > restrictions and hence the runner can process those
>                 restrictions in any>
>                 > order they want. (lets say your kafka partition has
>                 unconsumed commit>
>                 > offset range [20, 100), this could be split into
>                 [20, 60), [60, 100) and>
>                 > the [60, 100) offset range could be processed first)>
>                 >
>                 > You're right that you need to sort the output
>                 however you want within your>
>                 > DoFn before you make external calls to Kafka (this
>                 prevents you from using>
>                 > the KafkaIO sink implementation as a transform).
>                 There is an annotation>
>                 > @RequiresTimeSortedInput which is a special case for
>                 this sorting if you>
>                 > want it to be sorted by the elements timestamp but
>                 still you'll need to>
>                 > write to Kafka directly yourself from your DoFn.>
>                 >
>                 > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang
>                 <ha...@yelp.com <http://yelp.com>> wrote:>
>                 >
>                 > > We are using the Beam 2.20 Python SDK on a Flink
>                 1.9 runner. Our>
>                 > > messages originate from a custom source that
>                 consumes messages from a>
>                 > > Kafka topic and emits them in the order of their
>                 Kafka offsets to a>
>                 > > DoFn. After this DoFn processes the messages, they
>                 are emitted to a>
>                 > > custom sink that sends messages to a Kafka topic.>
>                 > >>
>                 > > We want to process those messages in the order in
>                 which we receive>
>                 > > them from Kafka and then emit them to the Kafka
>                 sink in the same>
>                 > > order, but based on our understanding Beam does
>                 not provide an>
>                 > > in-order transport. However, in practice we
>                 noticed that with a Python>
>                 > > SDK worker on Flink and a parallelism setting of 1
>                 and one sdk_worker>
>                 > > instance, messages seem to be both processed and
>                 emitted in order. Is>
>                 > > that implementation-specific in-order behavior
>                 something that we can>
>                 > > rely on, or is it very likely that this will break
>                 at some future>
>                 > > point?>
>                 > >>
>                 > > In case it's not recommended to depend on that
>                 behavior what is the>
>                 > > best approach for in-order processing?>
>                 > >>
>                 > >
>                 https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>
>                 > > recommends to order events in a heap, but
>                 according to our>
>                 > > understanding this approach will only work when
>                 directly writing to an>
>                 > > external system.>
>                 > >>
>                 >
>

Re: [External] Re: Ensuring messages are processed and emitted in-order

Posted by Reuven Lax <re...@google.com>.
I would not recommend using RequiresTimeSortedInput in this way. I also
would not use ingestion time, as in a distributed environment, time skew
between workers might mess up the order.

I will ping the discussion on the sorted state API and add you. My hope is
that this can be implemented efficiently soon, though efficient
implementation will definitely be runner dependent. If you are using Flink,
we'll have to figure how to implement this efficiently using Flink.
Dataflow is planning on providing native support for sorted state. I don't
know if Flink has native support for this, so it might have to be emulated
using its existing state primitives.

In the meanwhile, I would suggest using bagstate along with timers. The
timer can periodically pull sorted messages out of the bag (you can use
watermark timers here) and write back the messages that still have gaps in
them.

Reuven

On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong <ca...@yelp.com> wrote:

> Thank y’all for the input!
>
>
> About the RequiresTimeSortedInput, we were thinking of the following 2
> potential approaches:
>
>    1.
>
>    Assign kafka offset as the timestamp while doing a GroupByKey on
>    partition_id in a GlobalWindow
>    2.
>
>    Rely on the fact that Flink consumes from kafka partitions in offset
>    order and assign ingestion time as the timestamp. (We're using our own
>    non-KafkaIO based Kafka consumer extended from FlinkKafkaConsumer011 and
>    thus have direct control over timestamp and watermark assignment)
>
> We find it non-trivial to reason about watermark assignment especially
> when taking into consideration that:
>
>    1.
>
>    there might be restarts at any given time and
>    2.
>
>    advancing watermark in one kafka partition might result in:
>    1.
>
>       dropping elements from other kafka partitions (if we’re not
>       following native flink approach where we take the lowest watermark when
>       merging streams) or
>       2.
>
>       delay output from other kafka partitions since they’ll be buffered.
>
> Is there any recommendation on how this should be handled?
>
> In the direction of using a StatefulDoFn to buffer and reorder, we’re
> concerned about performance since we need to serialize and deserialize the
> entire BagState (with all the messages) everytime we process a message. And
> potentially insert this StatefulDoFn in multiple places in the pipeline. Is
> there any benchmark result of a pipeline that does something similar for us
> to reference?
>
> The proposal for a sorted state API sounds promising, is there a
> ticket/doc that we can follow?
>
>
> On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax <re...@google.com> wrote:
>
>> I don't know how well RequiresTimeSortedInput will work for any late data.
>>
>> I think you will want to include the Kafka offset in your records (unless
>> the records have their own sequence number) and then use state to buffer
>> and sort. There is a proposal (and work in progress) for a sorted state
>> API, which will make this easier and more efficient.
>>
>> Reuven
>>
>> On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> For runners that support @RequiresTimeSortedInput, all your input will
>>> come time sorted (as long as your element's timestamp tracks the order that
>>> you want).
>>> For runners that don't support this, you need to build a StatefulDoFn
>>> that buffers out of order events and reorders them to the order that you
>>> need.
>>>
>>> @Pablo Estrada <pa...@google.com> Any other suggestions for
>>> supporting CDC type pipelines?
>>>
>>> On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <ca...@yelp.com> wrote:
>>>
>>>> Thanks a lot for the response!
>>>>
>>>> We have several business use cases that rely strongly on ordering by
>>>> Kafka offset:
>>>> 1) streaming unwindowed inner join: say we want to join users with
>>>> reviews on user_id. Here are the schemas for two streams:
>>>>     user:
>>>>
>>>>    - user_id
>>>>    - name
>>>>    - timestamp
>>>>
>>>>     reviews:
>>>>
>>>>    - review_id
>>>>    - user_id
>>>>    - timestamp
>>>>
>>>> Here are the messages in each stream ordered by kafka offset:
>>>>     user:
>>>>     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>>>>     reviews:
>>>>     (ABC, 1, 90), (DEF, 2, 360)
>>>> I would expect to receive following output messages:
>>>>     (1, name_a, ABC) at timestamp 90
>>>>     (1, name_c, ABC) at timestamp 240
>>>>     (2, name_b, DEF) at timestamp 360
>>>> This can be done in native Flink since Flink kafka consumer reads from
>>>> each partition sequentially. But without an ordering guarantee, we can end
>>>> up with arbitrary results. So how would we implement this in Beam?
>>>> 2) unwindowed aggregation: aggregate all the employees for every
>>>> organization. Say we have a new employee stream with the following schema:
>>>>     new_employee:
>>>>
>>>>    - organization_id
>>>>    - employee_name
>>>>
>>>> And here are messaged ordered by kafka offset:
>>>> (1, name_a), (2, name_b), (2, name_c), (1, name_d)
>>>> I would expect the output to be:
>>>> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a,
>>>> name_d])
>>>> Again without an ordering guarantee, the result is non deterministic.
>>>>
>>>> Change data capture (CDC) streams are a very common use case for our
>>>> data pipeline. As in the examples above we rely on Kafka offsets to make
>>>> sure we process data mutations in the proper order. While in some cases we
>>>> have Flink native solutions to these problems (Flink provides ordering
>>>> guarantees within the chosen key), we are now building some new Beam
>>>> applications that would require ordering guarantees. What is the
>>>> recommended approach in Beam for such use cases? If this isn’t currently
>>>> supported, do we have any near plan to add native ordering support in Beam?
>>>>
>>>>
>>>> On 2020/06/09 20:37:22, Luke Cwik <l....@google.com> wrote:
>>>> > This will likely break due to:>
>>>> > * workers can have more then one thread and hence process the source
>>>> in>
>>>> > parallel>
>>>> > * splitting a source allows for the source to be broken up into
>>>> multiple>
>>>> > restrictions and hence the runner can process those restrictions in
>>>> any>
>>>> > order they want. (lets say your kafka partition has unconsumed
>>>> commit>
>>>> > offset range [20, 100), this could be split into [20, 60), [60, 100)
>>>> and>
>>>> > the [60, 100) offset range could be processed first)>
>>>> >
>>>> > You're right that you need to sort the output however you want within
>>>> your>
>>>> > DoFn before you make external calls to Kafka (this prevents you from
>>>> using>
>>>> > the KafkaIO sink implementation as a transform). There is an
>>>> annotation>
>>>> > @RequiresTimeSortedInput which is a special case for this sorting if
>>>> you>
>>>> > want it to be sorted by the elements timestamp but still you'll need
>>>> to>
>>>> > write to Kafka directly yourself from your DoFn.>
>>>> >
>>>> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:>
>>>> >
>>>> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our>
>>>> > > messages originate from a custom source that consumes messages from
>>>> a>
>>>> > > Kafka topic and emits them in the order of their Kafka offsets to
>>>> a>
>>>> > > DoFn. After this DoFn processes the messages, they are emitted to
>>>> a>
>>>> > > custom sink that sends messages to a Kafka topic.>
>>>> > >>
>>>> > > We want to process those messages in the order in which we receive>
>>>> > > them from Kafka and then emit them to the Kafka sink in the same>
>>>> > > order, but based on our understanding Beam does not provide an>
>>>> > > in-order transport. However, in practice we noticed that with a
>>>> Python>
>>>> > > SDK worker on Flink and a parallelism setting of 1 and one
>>>> sdk_worker>
>>>> > > instance, messages seem to be both processed and emitted in order.
>>>> Is>
>>>> > > that implementation-specific in-order behavior something that we
>>>> can>
>>>> > > rely on, or is it very likely that this will break at some future>
>>>> > > point?>
>>>> > >>
>>>> > > In case it's not recommended to depend on that behavior what is
>>>> the>
>>>> > > best approach for in-order processing?>
>>>> > >>
>>>> > >
>>>> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>>>>
>>>> > > recommends to order events in a heap, but according to our>
>>>> > > understanding this approach will only work when directly writing to
>>>> an>
>>>> > > external system.>
>>>> > >>
>>>> >
>>>>
>>>