You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Hadi Zhang <ha...@yelp.com> on 2020/06/08 23:23:39 UTC

Ensuring messages are processed and emitted in-order

We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our
messages originate from a custom source that consumes messages from a
Kafka topic and emits them in the order of their Kafka offsets to a
DoFn. After this DoFn processes the messages, they are emitted to a
custom sink that sends messages to a Kafka topic.

We want to process those messages in the order in which we receive
them from Kafka and then emit them to the Kafka sink in the same
order, but based on our understanding Beam does not provide an
in-order transport. However, in practice we noticed that with a Python
SDK worker on Flink and a parallelism setting of 1 and one sdk_worker
instance, messages seem to be both processed and emitted in order. Is
that implementation-specific in-order behavior something that we can
rely on, or is it very likely that this will break at some future
point?

In case it's not recommended to depend on that behavior what is the
best approach for in-order processing?
https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam
recommends to order events in a heap, but according to our
understanding this approach will only work when directly writing to an
external system.

Re: Ensuring messages are processed and emitted in-order

Posted by Luke Cwik <lc...@google.com>.
This will likely break due to:
* workers can have more then one thread and hence process the source in
parallel
* splitting a source allows for the source to be broken up into multiple
restrictions and hence the runner can process those restrictions in any
order they want. (lets say your kafka partition has unconsumed commit
offset range [20, 100), this could be split into [20, 60), [60, 100) and
the [60, 100) offset range could be processed first)

You're right that you need to sort the output however you want within your
DoFn before you make external calls to Kafka (this prevents you from using
the KafkaIO sink implementation as a transform). There is an annotation
@RequiresTimeSortedInput which is a special case for this sorting if you
want it to be sorted by the elements timestamp but still you'll need to
write to Kafka directly yourself from your DoFn.

On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:

> We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our
> messages originate from a custom source that consumes messages from a
> Kafka topic and emits them in the order of their Kafka offsets to a
> DoFn. After this DoFn processes the messages, they are emitted to a
> custom sink that sends messages to a Kafka topic.
>
> We want to process those messages in the order in which we receive
> them from Kafka and then emit them to the Kafka sink in the same
> order, but based on our understanding Beam does not provide an
> in-order transport. However, in practice we noticed that with a Python
> SDK worker on Flink and a parallelism setting of 1 and one sdk_worker
> instance, messages seem to be both processed and emitted in order. Is
> that implementation-specific in-order behavior something that we can
> rely on, or is it very likely that this will break at some future
> point?
>
> In case it's not recommended to depend on that behavior what is the
> best approach for in-order processing?
>
> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam
> recommends to order events in a heap, but according to our
> understanding this approach will only work when directly writing to an
> external system.
>