You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Daniel Robert <da...@acm.org> on 2020/01/06 22:25:19 UTC

Re: RabbitMQ and CheckpointMark feasibility

Alright, a bit late but this took me a while.

Thanks for all the input so far. I have rewritten much of the RabbitMq 
IO connector and have it ready to go in a draft pr: 
https://github.com/apache/beam/pull/10509

This should incorporate a lot of what's been discussed here, in terms of 
watermarking, serialization, error handling, etc. It also 
clarifies/cleans up a lot of very confusing documentation/api settings 
pertaining to using 'queues vs exchanges' and adds clarifying 
documentation on various valid AMQP paradigms.

Watermarking/timestamp management is mostly stolen from KafkaIO and 
modified as appropriate.

This also does a lot to improve resource management in terms of 
Connection and Channel usage, largely modeled after JdbcIO's 
ConnectionHandlerProvider concept.

I'm not entirely sure how best to proceed from here, hence the email. 
It's a huge PR, but it has no specific backing ticket (it should), and 
historically there haven't been many eyes on RabbitMq PRs.

Thanks,
-Danny

On 11/14/19 4:13 PM, Jan Lukavský wrote:
>
> On 11/14/19 9:50 PM, Daniel Robert wrote:
>
>> Alright, thanks everybody. I'm really appreciative of the 
>> conversation here. I think I see where my disconnect is and how this 
>> might all work together for me. There are some bugs in the current 
>> rabbit implementation that I think have confused my understanding of 
>> the intended semantics. I'm coming around to seeing how such a system 
>> with rabbit's restrictions can work properly in Beam (I'd totally 
>> forgotten about 'dedupe' support in Beam) but I want to clarify some 
>> implementation questions after pulling everyone's notes together.
>>
>> RabbitMQ reader should not bother accepting an existing 
>> CheckpointMark in its constructor (in 'ack-based' systems this is 
>> unnecessary per Eugene's original reply). It should construct its own 
>> CheckpointMark at construction time and use it throughout its lifecycle.
>>
>> At some point later, the CheckpointMark will be 'finalized'. If this 
>> CheckpointMark has been Serialized (via Coder or otherwise) or its 
>> underlying connection has been severed, this step will fail. This 
>> would mean at some point the messages are redelivered to Beam on some 
>> other Reader, so no data loss. If it has not been serialized, the 
>> acks will take place just fine, even if much later.
>>
>> If the system is using processing-time as event-time, however, the 
>> redelivery of these messages would effectively change the ordering 
>> and potentially the window they arrived in. I *believe* that Beam 
>> deduping seems to be managed per-window so if 'finalizeCheckpoint' is 
>> attempted (and fails) would these messages appear in a new window?
>>
> This is very much likely to happen with any source, if it would assign 
> something like *now* to event time. That is ill defined and if the 
> source cannot provide some retry-persistent estimate of real 
> event-time, than I'd suggest to force user to specify an UDF to 
> extract event time from the payload. Everything else would probably 
> break (at least if any timestamp-related windowing would be used in 
> the pipeline).
>>
>> Perhaps my question are now:
>> - how should a CheckpointMark should communicate failure to the Beam
>>
> An exception thrown should fail the checkpoint and therefore retry 
> everything from the last checkpoint.
>>
>> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, 
>> if the API dictates such a thing?
>>
> See above.
>>
>> - is there a provision that would need to be made for processing-time 
>> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
>> nervous redelivered messages would appear in another window)
>>
> You are nervous for a reason. :) I strongly believe processing time 
> source should be considered anti-pattern, at least in situations where 
> there is any time manipulation downstream (time-windows, stateful 
> processing, ...).
>>
>> - What is the relationship lifecycle-wise between a CheckpointMark 
>> and a Reader? My understanding is a CheckpointMark may outlive a 
>> Reader, is that correct?
>>
> Definitely. But the same instance bound to the lifecycle of the reader 
> would be used to finalizeCheckpoint (if that ever happens).
>>
>> Thanks for bearing with me everyone. It feels a bit unfortunate my 
>> first foray into beam is reliant on this rabbit connector but I'm 
>> learning a lot and I'm very grateful for the help. PRs pending once I 
>> get this all straightened out in my head.
>>
>> -Danny
>>
>> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>>> Hi Daniel,
>>>
>>>
>>> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.robert@acm.org 
>>> <ma...@acm.org>> wrote:
>>>
>>>     I believe I've nailed down a situation that happens in practice
>>>     that causes Beam and Rabbit to be incompatible. It seems that
>>>     runners can and do make assumptions about the serializability
>>>     (via Coder) of a CheckpointMark.
>>>
>>>     To start, these are the semantics of RabbitMQ:
>>>
>>>     - the client establishes a connection to the server
>>>     - client opens a channel on the connection
>>>     - messages are either pulled or pushed to the client from the
>>>     server along this channel
>>>     - when messages are done processing, they are acknowledged
>>>     *client-side* and must be acknowledged on the *same channel*
>>>     that originally received the message.
>>>
>>>     Since a channel (or any open connection) is non-serializable, it
>>>     means that a CheckpointMark that has been serialized cannot ever
>>>     be used to acknowledge these messages and correctly 'finalize'
>>>     the checkpoint. It also, as previously discussed in this thread,
>>>     implies a rabbit Reader cannot accept an existing CheckpointMark
>>>     at all; the Reader and the CheckpointMark must share the same
>>>     connection to the rabbit server ("channel").
>>>
>>> This is correct.
>>>
>>>     Next, I've found how DirectRunner (and presumably others) can
>>>     attempt to serialize a CheckpointMark that has not been
>>>     finalized. In
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>>>     the DirectRunner applies a probability and if it hits, it sets
>>>     the current reader to 'null' but retains the existing
>>>     CheckpointMark, which it then attempts to pass to a new reader
>>>     via a Coder.
>>>
>>> Correct, this simulates a failure scenario:
>>> - Runner was reading the source and, after finalizing a bunch of 
>>> previous CheckpointMarks, obtained a new one and serialized it so 
>>> things can be restored in case of failure
>>> - A failure happened before the current CheckpointMark could be 
>>> finalized, which means Beam was not able to guarantee that elements 
>>> after the last-finalized mark have been durably processed, so we may 
>>> need to re-read them, so runner recreates a reader from the current 
>>> mark.
>>>
>>>     This puts the shard, the runner, and the reader with differing
>>>     views of the world. In UnboundedReadEvaluatorFactory's
>>>     processElement function, a call to getReader(shard) (
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>>>     ) clones the shard's checkpoint mark and passes that to the new
>>>     reader. The reader ignores it, creating its own, but even if it
>>>     accepted it, it would be accepting a serialized CheckpointMark,
>>>     which wouldn't work.
>>>
>>> Correct in the sense that for a RabbitMQ reader, a CheckpointMark 
>>> doesn't affect what the reader will read: it depends only on the 
>>> broker's internal state (which in turn depends on which messages 
>>> have been acked by previous finalized CheckpointMark's).
>>>
>>>     Later, the runner calls finishRead (
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>>>     ). The shard's CheckpointMark (unserialized; which should still
>>>     be valid) is finalized. The reader's CheckpointMark (which may
>>>     be a different instance) becomes the return value, which is
>>>     referred to as "finishedCheckpoint" in the calling code, which
>>>     is misleading at best and problematic at worst as *this*
>>>     checkpoint has not been finalized.
>>>
>>> I'm not following what is the problem here. In that code, "oldMark" 
>>> is the last checkpoint mark to be finalized - calling 
>>> finalizeCheckpoint on it signals that Beam has durably processed all 
>>> the messages read from the reader until that mark. "mark" (the new 
>>> one) represents the state of the reader after the last finalized 
>>> mark, so it should not be finalized.
>>>
>>> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to 
>>> emulate) things go like this:
>>>
>>> Create a reader
>>> Let mark M1 = reader.getCheckpointMark()
>>> Durably persist M1 as the "restore point" of this reader
>>> ...read messages A B C from reader and durably process them...
>>> Finalize M1 (acks A B C)
>>> Let mark M2 = reader.getCheckpointMark()
>>> Durably persist M2 as the "restore point" of this reader
>>> ...read messages D E F and durably process them...
>>> Finalize M2 (acks D E F)
>>>
>>> Now let's imagine a failure.
>>> Durably persist M2 as the "restore point" of this reader
>>> ...read messages D E, and then a failure happens
>>> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
>>> Since M2 was not finalized, messages D E F were not acked, and 
>>> RabbitMQ will redeliver them to this reader. D E will be processed 
>>> twice, but only the results of this new processing will be durable.
>>> Finalize M2 (acks D E F)
>>> Etc.
>>> Basically you can think of this as a series of micro-bundles, where 
>>> micro-bundles are delimited by checkpoint marks, and each 
>>> micro-bundle is a runner-side transaction which either commits or 
>>> discards the results of processing all messages in this 
>>> micro-bundle. After a micro-bundle [M1, M2) commits, the runner 
>>> calls M1.finalizeCheckpointMark() and persists M2 as the new restore 
>>> point in case of failure.
>>>
>>>     So, tl;dr: I cannot find any means of maintaining a persistent
>>>     connection to the server for finalizing checkpoints that is safe
>>>     across runners. If there's a guarantee all of the shards are on
>>>     the same JVM instance, I could rely on global, static
>>>     collections/instances as a workaround, but if other runners
>>>     might serialize this across the wire, I'm stumped. The only
>>>     workable situation I can think of right now is to proactively
>>>     acknowledge messages as they are received and effectively no-op
>>>     in finalizeCheckpoint. This is very different, semantically, and
>>>     can lead to dropped messages if a pipeline doesn't finish
>>>     processing the given message.
>>>
>>>     Any help would be much appreciated.
>>>
>>> If I'm misunderstanding something above, could you describe in 
>>> detail a scenario that leads to message loss, or (less severe) to 
>>> more-than-once durable processing of the same message?
>>>
>>>     Thanks,
>>>     -Danny
>>>
>>>     On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>>>     Hi Daniel,
>>>>
>>>>     This is probably insufficiently well documented. The
>>>>     CheckpointMark is used for two purposes:
>>>>     1) To persistently store some notion of how much of the stream
>>>>     has been consumed, so that if something fails we can tell the
>>>>     underlying streaming system where to start reading when we
>>>>     re-create the reader. This is why CheckpointMark is
>>>>     Serializable. E.g. this makes sense for Kafka.
>>>>     2) To do acks - to let the underlying streaming system know
>>>>     that the Beam pipeline will never need data up to this
>>>>     CheckpointMark. Acking does not require serializability -
>>>>     runners call ack() on the same in-memory instance of
>>>>     CheckpointMark that was produced by the reader. E.g. this makes
>>>>     sense for RabbitMq or Pubsub.
>>>>
>>>>     In practice, these two capabilities tend to be mutually
>>>>     exclusive: some streaming systems can provide a serializable
>>>>     CheckpointMark, some can do acks, some can do neither - but
>>>>     very few (or none) can do both, and it's debatable whether it
>>>>     even makes sense for a system to provide both capabilities:
>>>>     usually acking is an implicit form of streaming-system-side
>>>>     checkpointing, i.e. when you re-create the reader you don't
>>>>     actually need to carry over any information from an old
>>>>     CheckpointMark - the necessary state (which records should be
>>>>     delivered) is maintained on the streaming system side.
>>>>
>>>>     These two are lumped together into one API simply because that
>>>>     was the best design option we came up with (not for lack of
>>>>     trying, but suggestions very much welcome - AFAIK nobody is
>>>>     happy with it).
>>>>
>>>>     RabbitMQ is under #2 - it can't do serializable checkpoint
>>>>     marks, but it can do acks. So you can simply ignore the
>>>>     non-serializability.
>>>>
>>>>     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>>>     <daniel.robert@acm.org <ma...@acm.org>> wrote:
>>>>
>>>>         (Background: I recently upgraded RabbitMqIO from the 4.x to
>>>>         5.x library.
>>>>         As part of this I switched to a pull-based API rather than the
>>>>         previously-used push-based. This has caused some nebulous
>>>>         problems so
>>>>         put up a correction PR that I think needs some eyes fairly
>>>>         quickly as
>>>>         I'd consider master to be broken for rabbitmq right now.
>>>>         The PR keeps
>>>>         the upgrade but reverts to the same push-based
>>>>         implementation as in 4.x:
>>>>         https://github.com/apache/beam/pull/9977 )
>>>>
>>>>         Regardless, in trying to get the pull-based API to work,
>>>>         I'm finding the
>>>>         interactions between rabbitmq and beam with CheckpointMark
>>>>         to be
>>>>         fundamentally impossible to implement so I'm hoping for
>>>>         some input here.
>>>>
>>>>         CheckointMark itself must be Serializable, presumably this
>>>>         means it gets
>>>>         shuffled around between nodes. However 'Channel', the
>>>>         tunnel through
>>>>         which it communicates with Rabbit to ack messages and
>>>>         finalize the
>>>>         checkpoint, is non-Serializable. Like most other
>>>>         CheckpointMark
>>>>         implementations, Channel is 'transient'. When a new
>>>>         CheckpointMark is
>>>>         instantiated, it's given a Channel. If an existing one is
>>>>         supplied to
>>>>         the Reader's constructor (part of the 'startReader()'
>>>>         interface), the
>>>>         channel is overwritten.
>>>>
>>>>         *However*, Rabbit does not support 'ack'ing messages on a
>>>>         channel other
>>>>         than the one that consumed them in the first place.
>>>>         Attempting to do so
>>>>         results in a '406 (PRECONDITION-FAILED) - unknown delivery
>>>>         tag'. (See
>>>>         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>>>
>>>>         ).
>>>>
>>>>         Truthfully, I don't really understand how the current
>>>>         implementation is
>>>>         working; it seems like a happy accident. But I'm curious if
>>>>         someone
>>>>         could help me debug and implement how to bridge the
>>>>         re-usable/serializable CheckpointMark requirement in Beam
>>>>         with this
>>>>         limitation of Rabbit.
>>>>
>>>>         Thanks,
>>>>         -Daniel Robert
>>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Kenneth Knowles <ke...@apache.org>.
I just took a look at the PR - it is, indeed, huge. But it is probably not
too hard to review as it is mostly fresh code. It is true that there hasn't
been a ton of work on RabbitMQ so maybe the reviewer isn't obvious. There's
3 committers on this thread who seem to have the expertise and interest...

Kenn

On Mon, Jan 6, 2020 at 2:25 PM Daniel Robert <da...@acm.org> wrote:

> Alright, a bit late but this took me a while.
>
> Thanks for all the input so far. I have rewritten much of the RabbitMq IO
> connector and have it ready to go in a draft pr:
> https://github.com/apache/beam/pull/10509
>
> This should incorporate a lot of what's been discussed here, in terms of
> watermarking, serialization, error handling, etc. It also clarifies/cleans
> up a lot of very confusing documentation/api settings pertaining to using
> 'queues vs exchanges' and adds clarifying documentation on various valid
> AMQP paradigms.
>
> Watermarking/timestamp management is mostly stolen from KafkaIO and
> modified as appropriate.
>
> This also does a lot to improve resource management in terms of Connection
> and Channel usage, largely modeled after JdbcIO's ConnectionHandlerProvider
> concept.
>
> I'm not entirely sure how best to proceed from here, hence the email. It's
> a huge PR, but it has no specific backing ticket (it should), and
> historically there haven't been many eyes on RabbitMq PRs.
>
> Thanks,
> -Danny
> On 11/14/19 4:13 PM, Jan Lukavský wrote:
>
> On 11/14/19 9:50 PM, Daniel Robert wrote:
>
> Alright, thanks everybody. I'm really appreciative of the conversation
> here. I think I see where my disconnect is and how this might all work
> together for me. There are some bugs in the current rabbit implementation
> that I think have confused my understanding of the intended semantics. I'm
> coming around to seeing how such a system with rabbit's restrictions can
> work properly in Beam (I'd totally forgotten about 'dedupe' support in
> Beam) but I want to clarify some implementation questions after pulling
> everyone's notes together.
>
> RabbitMQ reader should not bother accepting an existing CheckpointMark in
> its constructor (in 'ack-based' systems this is unnecessary per Eugene's
> original reply). It should construct its own CheckpointMark at construction
> time and use it throughout its lifecycle.
>
> At some point later, the CheckpointMark will be 'finalized'. If this
> CheckpointMark has been Serialized (via Coder or otherwise) or its
> underlying connection has been severed, this step will fail. This would
> mean at some point the messages are redelivered to Beam on some other
> Reader, so no data loss. If it has not been serialized, the acks will take
> place just fine, even if much later.
>
> If the system is using processing-time as event-time, however, the
> redelivery of these messages would effectively change the ordering and
> potentially the window they arrived in. I *believe* that Beam deduping
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and
> fails) would these messages appear in a new window?
>
> This is very much likely to happen with any source, if it would assign
> something like *now* to event time. That is ill defined and if the source
> cannot provide some retry-persistent estimate of real event-time, than I'd
> suggest to force user to specify an UDF to extract event time from the
> payload. Everything else would probably break (at least if any
> timestamp-related windowing would be used in the pipeline).
>
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
>
> An exception thrown should fail the checkpoint and therefore retry
> everything from the last checkpoint.
>
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the
> API dictates such a thing?
>
> See above.
>
> - is there a provision that would need to be made for processing-time
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm
> nervous redelivered messages would appear in another window)
>
> You are nervous for a reason. :) I strongly believe processing time source
> should be considered anti-pattern, at least in situations where there is
> any time manipulation downstream (time-windows, stateful processing, ...).
>
> - What is the relationship lifecycle-wise between a CheckpointMark and a
> Reader? My understanding is a CheckpointMark may outlive a Reader, is that
> correct?
>
> Definitely. But the same instance bound to the lifecycle of the reader
> would be used to finalizeCheckpoint (if that ever happens).
>
> Thanks for bearing with me everyone. It feels a bit unfortunate my first
> foray into beam is reliant on this rabbit connector but I'm learning a lot
> and I'm very grateful for the help. PRs pending once I get this all
> straightened out in my head.
>
> -Danny
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
>
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <da...@acm.org>
> wrote:
>
>> I believe I've nailed down a situation that happens in practice that
>> causes Beam and Rabbit to be incompatible. It seems that runners can and do
>> make assumptions about the serializability (via Coder) of a CheckpointMark.
>>
>> To start, these are the semantics of RabbitMQ:
>>
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server
>> along this channel
>> - when messages are done processing, they are acknowledged *client-side*
>> and must be acknowledged on the *same channel* that originally received the
>> message.
>>
>> Since a channel (or any open connection) is non-serializable, it means
>> that a CheckpointMark that has been serialized cannot ever be used to
>> acknowledge these messages and correctly 'finalize' the checkpoint. It
>> also, as previously discussed in this thread, implies a rabbit Reader
>> cannot accept an existing CheckpointMark at all; the Reader and the
>> CheckpointMark must share the same connection to the rabbit server
>> ("channel").
>>
> This is correct.
>
>
>> Next, I've found how DirectRunner (and presumably others) can attempt to
>> serialize a CheckpointMark that has not been finalized. In
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>> the DirectRunner applies a probability and if it hits, it sets the current
>> reader to 'null' but retains the existing CheckpointMark, which it then
>> attempts to pass to a new reader via a Coder.
>>
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of previous
> CheckpointMarks, obtained a new one and serialized it so things can be
> restored in case of failure
> - A failure happened before the current CheckpointMark could be finalized,
> which means Beam was not able to guarantee that elements after the
> last-finalized mark have been durably processed, so we may need to re-read
> them, so runner recreates a reader from the current mark.
>
>
>> This puts the shard, the runner, and the reader with differing views of
>> the world. In UnboundedReadEvaluatorFactory's processElement function, a
>> call to getReader(shard) (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>> ) clones the shard's checkpoint mark and passes that to the new reader. The
>> reader ignores it, creating its own, but even if it accepted it, it would
>> be accepting a serialized CheckpointMark, which wouldn't work.
>>
> Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
> affect what the reader will read: it depends only on the broker's internal
> state (which in turn depends on which messages have been acked by previous
> finalized CheckpointMark's).
>
>> Later, the runner calls finishRead (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>> ). The shard's CheckpointMark (unserialized; which should still be valid)
>> is finalized. The reader's CheckpointMark (which may be a different
>> instance) becomes the return value, which is referred to as
>> "finishedCheckpoint" in the calling code, which is misleading at best and
>> problematic at worst as *this* checkpoint has not been finalized.
>>
> I'm not following what is the problem here. In that code, "oldMark" is the
> last checkpoint mark to be finalized - calling finalizeCheckpoint on it
> signals that Beam has durably processed all the messages read from the
> reader until that mark. "mark" (the new one) represents the state of the
> reader after the last finalized mark, so it should not be finalized.
>
> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
> things go like this:
>
> Create a reader
> Let mark M1 = reader.getCheckpointMark()
> Durably persist M1 as the "restore point" of this reader
> ...read messages A B C from reader and durably process them...
> Finalize M1 (acks A B C)
> Let mark M2 = reader.getCheckpointMark()
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E F and durably process them...
> Finalize M2 (acks D E F)
>
> Now let's imagine a failure.
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E, and then a failure happens
> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
> Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
> will redeliver them to this reader. D E will be processed twice, but only
> the results of this new processing will be durable.
> Finalize M2 (acks D E F)
> Etc.
>
> Basically you can think of this as a series of micro-bundles, where
> micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
> runner-side transaction which either commits or discards the results of
> processing all messages in this micro-bundle. After a micro-bundle [M1, M2)
> commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as
> the new restore point in case of failure.
>
>> So, tl;dr: I cannot find any means of maintaining a persistent connection
>> to the server for finalizing checkpoints that is safe across runners. If
>> there's a guarantee all of the shards are on the same JVM instance, I could
>> rely on global, static collections/instances as a workaround, but if other
>> runners might serialize this across the wire, I'm stumped. The only
>> workable situation I can think of right now is to proactively acknowledge
>> messages as they are received and effectively no-op in finalizeCheckpoint.
>> This is very different, semantically, and can lead to dropped messages if a
>> pipeline doesn't finish processing the given message.
>>
>> Any help would be much appreciated.
>>
> If I'm misunderstanding something above, could you describe in detail a
> scenario that leads to message loss, or (less severe) to more-than-once
> durable processing of the same message?
>
>
>> Thanks,
>> -Danny
>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert
>>>
>>>