You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Daniel Robert <da...@acm.org> on 2019/11/07 20:06:57 UTC

RabbitMQ and CheckpointMark feasibility

(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. 
As part of this I switched to a pull-based API rather than the 
previously-used push-based. This has caused some nebulous problems so 
put up a correction PR that I think needs some eyes fairly quickly as 
I'd consider master to be broken for rabbitmq right now. The PR keeps 
the upgrade but reverts to the same push-based implementation as in 4.x: 
https://github.com/apache/beam/pull/9977 )

Regardless, in trying to get the pull-based API to work, I'm finding the 
interactions between rabbitmq and beam with CheckpointMark to be 
fundamentally impossible to implement so I'm hoping for some input here.

CheckointMark itself must be Serializable, presumably this means it gets 
shuffled around between nodes. However 'Channel', the tunnel through 
which it communicates with Rabbit to ack messages and finalize the 
checkpoint, is non-Serializable. Like most other CheckpointMark 
implementations, Channel is 'transient'. When a new CheckpointMark is 
instantiated, it's given a Channel. If an existing one is supplied to 
the Reader's constructor (part of the 'startReader()' interface), the 
channel is overwritten.

*However*, Rabbit does not support 'ack'ing messages on a channel other 
than the one that consumed them in the first place. Attempting to do so 
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See 
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed 
).

Truthfully, I don't really understand how the current implementation is 
working; it seems like a happy accident. But I'm curious if someone 
could help me debug and implement how to bridge the 
re-usable/serializable CheckpointMark requirement in Beam with this 
limitation of Rabbit.

Thanks,
-Daniel Robert


Re: RabbitMQ and CheckpointMark feasibility

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

answers inline.

On 11/14/19 4:15 PM, Daniel Robert wrote:
>
> We may be talking past each other a bit, though I do appreciate the 
> responses.
>
> Rabbit behaves a lot like a relational database in terms of state 
> required. A connection is analogous to a database connection, and a 
> channel (poor analogy here) is similar to an open transaction. If the 
> connection is severed, the transaction will not be able to be committed.
>
> In direct response to the consumer lifecycle linked to, yes, one can 
> recover and re-establish connections, but any state maintained within 
> the previous channel are lost. If there were messages that had not 
> been acknowledged, they would have been re-delivered to some other 
> consumer as they were never acknowledged.
>
Yes, that is exactly what basically all streaming sources that commit 
one message at a time (e.g. google pubsub, mqtt, ...) do. That is no 
problem for Beam, because you have to take into account two things:

  a) if a checkpoint is taken, it is taken in a way that ensures 
exactly-once processing in downstream operators (that is actually runner 
dependent, but all major runners behave like that)

  b) some sources might redeliver messages even in between of 
checkpoints (for instance due to timeout of message confirm) - such 
sources have in common that they use commit schemes of one message at a 
time (like rabbit, mqtt, or pubsub). This manifests by the need to 
override the default implementation of 
CheckpointMark#finalizeCheckpoint, which in javadoc [1] states that: 
"Returns whether this source requires explicit deduping. This is needed 
if the underlying data source can return the same record multiple times, 
such a queuing system with a pull-ack model. Sources where the records 
read are uniquely identified by the persisted state in the 
CheckpointMark do not need this."

That is probably exactly what is your case.

[1] 
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/UnboundedSource.html#requiresDeduping--

> "Subscription" isn't really the model in rabbit. It has advantages and 
> disadvantages when compared with kafka -- mostly out of scope here -- 
> but some quick advantages of the rabbit model: 1) it parallelizes 
> "infinitely" without any changes to server (no re-partitioning or the 
> like); 2) messages can be acknowledge in a separate order than they 
> were consumed; 3) because state is managed associated with an active 
> connection, at-least-once delivery semantics are easy to implement as 
> any disconnection will result in the messages being re-placed in the 
> queue and delivered to a new consumer. To say it's "incompatible with 
> any fault tolerant semantics" is unfair, they just aren't incompatible 
> with Beam's, as Beam is currently implemented.
>
What I mean by that is "if rabbit would not be able to recover 
'subscription' in at least at-least-once fashion, then it is at best 
at-most-once and thus not fault tolerant. I was pretty sure that it is 
not the case.
>
> Regardless, I'm now wondering what the best path forward is. Rabbit 
> isn't unusable in Beam if the set of requirements and tradeoffs are 
> well documented. That is, there are use cases that could be properly 
> supported and some that likely can't.
>
I would really say that rabbit can be fully supported by Beam. Maybe the 
best analogy would be PubSubIO.
>
> One option would be to use a pull-based api and immediately 
> acknowledge each message as they arrive. This would effectively make 
> the CheckpointMark a no-op, other than maintaining the watermark. In a 
> pipeline that uses fixed windows (or non-session windowing) and uses a 
> runner that supports 'Drain'-style semantics (like Dataflow) this 
> should work just fine I think.
>
That would make the source not fault tolerant, because messages could 
not be redelivered.
>
> Another would be to do a best-attempt at acknowledging as late as 
> possible. This would be a hybrid approach where we attempt 
> acknowledgements in the CheckpointMark, but use a special Coder that 
> acknowledges all messages at the point the CheckpointMark is encoded. 
> I think this feels a bit unsafe and overly complex, and I'm not sure 
> it solves any real-world problems.
>
> I also feel like perhaps we should include Beam IO documentation that 
> makes it clear that an unbounded source that requires a persistent 
> connection for state tracking is not supportable by beam.
>
> Thanks,
> -Danny
>
> On 11/14/19 7:49 AM, Jan Lukavský wrote:
>> Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
>> Generally, if RabbitMQ would have no way to recover subscription 
>> (which I don't think is the case), then it would not be incompatible 
>> with beam, but actually with would be incompatible any fault tolerant 
>> semantics.
>>
>> [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>>
>> Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert 
>> <da...@acm.org>:
>>
>>
>>     On 11/14/19 2:32 AM, Jan Lukavský wrote:
>>
>>         Hi Danny,
>>
>>         as Eugene pointed out, there are essentially two "modes of
>>         operation" of CheckpointMark. It can:
>>
>>          a) be used to somehow restore state of a reader (in call to
>>         UnboundedSource#createReader)
>>
>>          b) confirm processed elements in
>>         CheckpointMark#finalizeCheckpoint
>>
>>         If your source doesn't provide a persistent position in data
>>         stream that can be referred to (and serialized - example of
>>         this would be kafka offsets), then what you actually need to
>>         serialize is not the channel, but a way how to restore it -
>>         e.g. by opening a new channel with a given 'consumer group
>>         name'. Then you just use this checkpoint to commit your
>>         processed data in finalizeCheckpoint.
>>
>>         Note that the finalizeCheckpoint is not guaranteed to be
>>         called - that can happen in cases when an error occurs and
>>         the source has to be rewind back - that is what direct runner
>>         emulates with the probability of 'readerReuseChance'.
>>
>>         I'm reading the documentation of RabbitMQ very quickly, but
>>         if I understand it correctly, then you have to create a
>>         subscription to the broker, serialize identifier of the
>>         subscription into the checkpointmark and then just recover
>>         the subscription in call to UnboundedSource#createReader.
>>         That should do the trick.
>>
>>     I have not seen any such documentation in rabbit. My
>>     understanding is it has to be the same, physical connection and
>>     channel. Can you cite the source you were looking at?
>>
>>     -Danny
>>
>>         Hope this helps, sorry if I'm not using 100% correct RabbitMQ
>>         terminology as I said, I'm not quite familiar with it.
>>
>>         Best,
>>
>>          Jan
>>
>>         On 11/14/19 5:26 AM, Daniel Robert wrote:
>>
>>             I believe I've nailed down a situation that happens in
>>             practice that causes Beam and Rabbit to be incompatible.
>>             It seems that runners can and do make assumptions about
>>             the serializability (via Coder) of a CheckpointMark.
>>
>>             To start, these are the semantics of RabbitMQ:
>>
>>             - the client establishes a connection to the server
>>             - client opens a channel on the connection
>>             - messages are either pulled or pushed to the client from
>>             the server along this channel
>>             - when messages are done processing, they are
>>             acknowledged *client-side* and must be acknowledged on
>>             the *same channel* that originally received the message.
>>
>>             Since a channel (or any open connection) is
>>             non-serializable, it means that a CheckpointMark that has
>>             been serialized cannot ever be used to acknowledge these
>>             messages and correctly 'finalize' the checkpoint. It
>>             also, as previously discussed in this thread, implies a
>>             rabbit Reader cannot accept an existing CheckpointMark at
>>             all; the Reader and the CheckpointMark must share the
>>             same connection to the rabbit server ("channel").
>>
>>             Next, I've found how DirectRunner (and presumably others)
>>             can attempt to serialize a CheckpointMark that has not
>>             been finalized. In
>>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>>             the DirectRunner applies a probability and if it hits, it
>>             sets the current reader to 'null' but retains the
>>             existing CheckpointMark, which it then attempts to pass
>>             to a new reader via a Coder.
>>
>>             This puts the shard, the runner, and the reader with
>>             differing views of the world. In
>>             UnboundedReadEvaluatorFactory's processElement function,
>>             a call to getReader(shard) (
>>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>>             ) clones the shard's checkpoint mark and passes that to
>>             the new reader. The reader ignores it, creating its own,
>>             but even if it accepted it, it would be accepting a
>>             serialized CheckpointMark, which wouldn't work. Later,
>>             the runner calls finishRead (
>>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>>             ). The shard's CheckpointMark (unserialized; which should
>>             still be valid) is finalized. The reader's CheckpointMark
>>             (which may be a different instance) becomes the return
>>             value, which is referred to as "finishedCheckpoint" in
>>             the calling code, which is misleading at best and
>>             problematic at worst as *this* checkpoint has not been
>>             finalized.
>>
>>             So, tl;dr: I cannot find any means of maintaining a
>>             persistent connection to the server for finalizing
>>             checkpoints that is safe across runners. If there's a
>>             guarantee all of the shards are on the same JVM instance,
>>             I could rely on global, static collections/instances as a
>>             workaround, but if other runners might serialize this
>>             across the wire, I'm stumped. The only workable situation
>>             I can think of right now is to proactively acknowledge
>>             messages as they are received and effectively no-op in
>>             finalizeCheckpoint. This is very different, semantically,
>>             and can lead to dropped messages if a pipeline doesn't
>>             finish processing the given message.
>>
>>             Any help would be much appreciated.
>>
>>             Thanks,
>>             -Danny
>>
>>             On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>
>>                 Hi Daniel,
>>
>>                 This is probably insufficiently well documented. The
>>                 CheckpointMark is used for two purposes:
>>                 1) To persistently store some notion of how much of
>>                 the stream has been consumed, so that if something
>>                 fails we can tell the underlying streaming system
>>                 where to start reading when we re-create the reader.
>>                 This is why CheckpointMark is Serializable. E.g. this
>>                 makes sense for Kafka.
>>                 2) To do acks - to let the underlying streaming
>>                 system know that the Beam pipeline will never need
>>                 data up to this CheckpointMark. Acking does not
>>                 require serializability - runners call ack() on the
>>                 same in-memory instance of CheckpointMark that was
>>                 produced by the reader. E.g. this makes sense for
>>                 RabbitMq or Pubsub.
>>
>>                 In practice, these two capabilities tend to be
>>                 mutually exclusive: some streaming systems can
>>                 provide a serializable CheckpointMark, some can do
>>                 acks, some can do neither - but very few (or none)
>>                 can do both, and it's debatable whether it even makes
>>                 sense for a system to provide both capabilities:
>>                 usually acking is an implicit form of
>>                 streaming-system-side checkpointing, i.e. when you
>>                 re-create the reader you don't actually need to carry
>>                 over any information from an old CheckpointMark - the
>>                 necessary state (which records should be delivered)
>>                 is maintained on the streaming system side.
>>
>>                 These two are lumped together into one API simply
>>                 because that was the best design option we came up
>>                 with (not for lack of trying, but suggestions very
>>                 much welcome - AFAIK nobody is happy with it).
>>
>>                 RabbitMQ is under #2 - it can't do serializable
>>                 checkpoint marks, but it can do acks. So you can
>>                 simply ignore the non-serializability.
>>
>>                 On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>                 <daniel.robert@acm.org
>>                 <ma...@acm.org>> wrote:
>>
>>                     (Background: I recently upgraded RabbitMqIO from
>>                     the 4.x to 5.x library.
>>                     As part of this I switched to a pull-based API
>>                     rather than the
>>                     previously-used push-based. This has caused some
>>                     nebulous problems so
>>                     put up a correction PR that I think needs some
>>                     eyes fairly quickly as
>>                     I'd consider master to be broken for rabbitmq
>>                     right now. The PR keeps
>>                     the upgrade but reverts to the same push-based
>>                     implementation as in 4.x:
>>                     https://github.com/apache/beam/pull/9977 )
>>
>>                     Regardless, in trying to get the pull-based API
>>                     to work, I'm finding the
>>                     interactions between rabbitmq and beam with
>>                     CheckpointMark to be
>>                     fundamentally impossible to implement so I'm
>>                     hoping for some input here.
>>
>>                     CheckointMark itself must be Serializable,
>>                     presumably this means it gets
>>                     shuffled around between nodes. However 'Channel',
>>                     the tunnel through
>>                     which it communicates with Rabbit to ack messages
>>                     and finalize the
>>                     checkpoint, is non-Serializable. Like most other
>>                     CheckpointMark
>>                     implementations, Channel is 'transient'. When a
>>                     new CheckpointMark is
>>                     instantiated, it's given a Channel. If an
>>                     existing one is supplied to
>>                     the Reader's constructor (part of the
>>                     'startReader()' interface), the
>>                     channel is overwritten.
>>
>>                     *However*, Rabbit does not support 'ack'ing
>>                     messages on a channel other
>>                     than the one that consumed them in the first
>>                     place. Attempting to do so
>>                     results in a '406 (PRECONDITION-FAILED) - unknown
>>                     delivery tag'. (See
>>                     https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>
>>                     ).
>>
>>                     Truthfully, I don't really understand how the
>>                     current implementation is
>>                     working; it seems like a happy accident. But I'm
>>                     curious if someone
>>                     could help me debug and implement how to bridge the
>>                     re-usable/serializable CheckpointMark requirement
>>                     in Beam with this
>>                     limitation of Rabbit.
>>
>>                     Thanks,
>>                     -Daniel Robert
>>
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Jan Lukavský <je...@seznam.cz>.
Just as a matter of curiosity, I wonder why it would be needed to assign 
a (local) UUIDs to RabbitMQ streams. There seem to be only two options:

  a) RabbitMQ does not support restore of client connection (this is 
valid, many sources work like that, e.g. plain websocket, or UDP stream)

  b) it does support that (and according to the documentation and 
overall logic it seems it should)

if a) is true, then the source itself is at most once and there is 
actually pretty nothing we can do about that.

If b) is true, then there has to be some sort of identifier of the 
"subscriber", "stream", "session", "consumer", whatever we call it. This 
identifier should be serializable and transferable to different machine. 
Every other option leads to (in my point of view) non-sensical 
conclusions (after all, that's why web browsers have cookies and all of 
us can enjoy the pleasant view of popups asking us to accept this, 
right? :))

If there is a serializable identifier of the "stream", "session", 
"consumer", "subscriber", then the stream could be just recreated in 
call to UnboundedSource#createReader().

One more not yet mentioned point - the logic in DirectRunner associated 
with 'readerReuseChance' (assigning reader a null value under some 
probability distribution) is just an emulation of faults in a real system.

Jan

On 11/14/19 7:21 PM, Reuven Lax wrote:
> Just a thought: instead of embedding the RabbitMQ streams inside the 
> checkpoint mark, could you keep a global static map of RabbitMQ 
> streams keyed by a unique UUID. Then all you have to serialize inside 
> the CheckpointMark is the UUID; you can look up the actual stream in 
> the constructor of the CheckpointMark and cache it in a volatile 
> variable that won't be serialized.
>
> This does mean that if the source shard starts up on a new machine 
> (this will happen after a crash or if a runner load balances to 
> another machine) then you cannot recover the same RabbitMQ stream. I 
> presume (hope!) that RabbitMQ must have some sort ack timeout and will 
> redeliver the messages after a while. In this case those messages will 
> get "stuck" until RabbitMQ redelivers them, but will eventually show 
> up again on the new RabbitMQ stream. (I hope that opening a new stream 
> would not redeliver messages that had already been successfully 
> acked on the previous stream).
>
> Would this work?
>
> Reuven
>
> On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert <daniel.robert@acm.org 
> <ma...@acm.org>> wrote:
>
>     We may be talking past each other a bit, though I do appreciate
>     the responses.
>
>     Rabbit behaves a lot like a relational database in terms of state
>     required. A connection is analogous to a database connection, and
>     a channel (poor analogy here) is similar to an open transaction.
>     If the connection is severed, the transaction will not be able to
>     be committed.
>
>     In direct response to the consumer lifecycle linked to, yes, one
>     can recover and re-establish connections, but any state maintained
>     within the previous channel are lost. If there were messages that
>     had not been acknowledged, they would have been re-delivered to
>     some other consumer as they were never acknowledged.
>
>     "Subscription" isn't really the model in rabbit. It has advantages
>     and disadvantages when compared with kafka -- mostly out of scope
>     here -- but some quick advantages of the rabbit model: 1) it
>     parallelizes "infinitely" without any changes to server (no
>     re-partitioning or the like); 2) messages can be acknowledge in a
>     separate order than they were consumed; 3) because state is
>     managed associated with an active connection, at-least-once
>     delivery semantics are easy to implement as any disconnection will
>     result in the messages being re-placed in the queue and delivered
>     to a new consumer. To say it's "incompatible with any fault
>     tolerant semantics" is unfair, they just aren't incompatible with
>     Beam's, as Beam is currently implemented.
>
>     Regardless, I'm now wondering what the best path forward is.
>     Rabbit isn't unusable in Beam if the set of requirements and
>     tradeoffs are well documented. That is, there are use cases that
>     could be properly supported and some that likely can't.
>
>     One option would be to use a pull-based api and immediately
>     acknowledge each message as they arrive. This would effectively
>     make the CheckpointMark a no-op, other than maintaining the
>     watermark. In a pipeline that uses fixed windows (or non-session
>     windowing) and uses a runner that supports 'Drain'-style semantics
>     (like Dataflow) this should work just fine I think.
>
>     Another would be to do a best-attempt at acknowledging as late as
>     possible. This would be a hybrid approach where we attempt
>     acknowledgements in the CheckpointMark, but use a special Coder
>     that acknowledges all messages at the point the CheckpointMark is
>     encoded. I think this feels a bit unsafe and overly complex, and
>     I'm not sure it solves any real-world problems.
>
>     I also feel like perhaps we should include Beam IO documentation
>     that makes it clear that an unbounded source that requires a
>     persistent connection for state tracking is not supportable by beam.
>
>     Thanks,
>     -Danny
>
>     On 11/14/19 7:49 AM, Jan Lukavský wrote:
>>     Hi, as I said, I didn't dig too deep into that, but what I saw
>>     was [1].
>>     Generally, if RabbitMQ would have no way to recover subscription
>>     (which I don't think is the case), then it would not be
>>     incompatible with beam, but actually with would be incompatible
>>     any fault tolerant semantics.
>>
>>     [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>>
>>     Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
>>     <da...@acm.org> <ma...@acm.org>:
>>
>>
>>         On 11/14/19 2:32 AM, Jan Lukavský wrote:
>>
>>             Hi Danny,
>>
>>             as Eugene pointed out, there are essentially two "modes
>>             of operation" of CheckpointMark. It can:
>>
>>              a) be used to somehow restore state of a reader (in call
>>             to UnboundedSource#createReader)
>>
>>              b) confirm processed elements in
>>             CheckpointMark#finalizeCheckpoint
>>
>>             If your source doesn't provide a persistent position in
>>             data stream that can be referred to (and serialized -
>>             example of this would be kafka offsets), then what you
>>             actually need to serialize is not the channel, but a way
>>             how to restore it - e.g. by opening a new channel with a
>>             given 'consumer group name'. Then you just use this
>>             checkpoint to commit your processed data in
>>             finalizeCheckpoint.
>>
>>             Note that the finalizeCheckpoint is not guaranteed to be
>>             called - that can happen in cases when an error occurs
>>             and the source has to be rewind back - that is what
>>             direct runner emulates with the probability of
>>             'readerReuseChance'.
>>
>>             I'm reading the documentation of RabbitMQ very quickly,
>>             but if I understand it correctly, then you have to create
>>             a subscription to the broker, serialize identifier of the
>>             subscription into the checkpointmark and then just
>>             recover the subscription in call to
>>             UnboundedSource#createReader. That should do the trick.
>>
>>         I have not seen any such documentation in rabbit. My
>>         understanding is it has to be the same, physical connection
>>         and channel. Can you cite the source you were looking at?
>>
>>         -Danny
>>
>>             Hope this helps, sorry if I'm not using 100% correct
>>             RabbitMQ terminology as I said, I'm not quite familiar
>>             with it.
>>
>>             Best,
>>
>>              Jan
>>
>>             On 11/14/19 5:26 AM, Daniel Robert wrote:
>>
>>                 I believe I've nailed down a situation that happens
>>                 in practice that causes Beam and Rabbit to be
>>                 incompatible. It seems that runners can and do make
>>                 assumptions about the serializability (via Coder) of
>>                 a CheckpointMark.
>>
>>                 To start, these are the semantics of RabbitMQ:
>>
>>                 - the client establishes a connection to the server
>>                 - client opens a channel on the connection
>>                 - messages are either pulled or pushed to the client
>>                 from the server along this channel
>>                 - when messages are done processing, they are
>>                 acknowledged *client-side* and must be acknowledged
>>                 on the *same channel* that originally received the
>>                 message.
>>
>>                 Since a channel (or any open connection) is
>>                 non-serializable, it means that a CheckpointMark that
>>                 has been serialized cannot ever be used to
>>                 acknowledge these messages and correctly 'finalize'
>>                 the checkpoint. It also, as previously discussed in
>>                 this thread, implies a rabbit Reader cannot accept an
>>                 existing CheckpointMark at all; the Reader and the
>>                 CheckpointMark must share the same connection to the
>>                 rabbit server ("channel").
>>
>>                 Next, I've found how DirectRunner (and presumably
>>                 others) can attempt to serialize a CheckpointMark
>>                 that has not been finalized. In
>>                 https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>>                 the DirectRunner applies a probability and if it
>>                 hits, it sets the current reader to 'null' but
>>                 retains the existing CheckpointMark, which it then
>>                 attempts to pass to a new reader via a Coder.
>>
>>                 This puts the shard, the runner, and the reader with
>>                 differing views of the world. In
>>                 UnboundedReadEvaluatorFactory's processElement
>>                 function, a call to getReader(shard) (
>>                 https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>>                 ) clones the shard's checkpoint mark and passes that
>>                 to the new reader. The reader ignores it, creating
>>                 its own, but even if it accepted it, it would be
>>                 accepting a serialized CheckpointMark, which wouldn't
>>                 work. Later, the runner calls finishRead (
>>                 https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>>                 ). The shard's CheckpointMark (unserialized; which
>>                 should still be valid) is finalized. The reader's
>>                 CheckpointMark (which may be a different instance)
>>                 becomes the return value, which is referred to as
>>                 "finishedCheckpoint" in the calling code, which is
>>                 misleading at best and problematic at worst as *this*
>>                 checkpoint has not been finalized.
>>
>>                 So, tl;dr: I cannot find any means of maintaining a
>>                 persistent connection to the server for finalizing
>>                 checkpoints that is safe across runners. If there's a
>>                 guarantee all of the shards are on the same JVM
>>                 instance, I could rely on global, static
>>                 collections/instances as a workaround, but if other
>>                 runners might serialize this across the wire, I'm
>>                 stumped. The only workable situation I can think of
>>                 right now is to proactively acknowledge messages as
>>                 they are received and effectively no-op in
>>                 finalizeCheckpoint. This is very different,
>>                 semantically, and can lead to dropped messages if a
>>                 pipeline doesn't finish processing the given message.
>>
>>                 Any help would be much appreciated.
>>
>>                 Thanks,
>>                 -Danny
>>
>>                 On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>
>>                     Hi Daniel,
>>
>>                     This is probably insufficiently well documented.
>>                     The CheckpointMark is used for two purposes:
>>                     1) To persistently store some notion of how much
>>                     of the stream has been consumed, so that if
>>                     something fails we can tell the underlying
>>                     streaming system where to start reading when we
>>                     re-create the reader. This is why CheckpointMark
>>                     is Serializable. E.g. this makes sense for Kafka.
>>                     2) To do acks - to let the underlying streaming
>>                     system know that the Beam pipeline will never
>>                     need data up to this CheckpointMark. Acking does
>>                     not require serializability - runners call ack()
>>                     on the same in-memory instance of CheckpointMark
>>                     that was produced by the reader. E.g. this makes
>>                     sense for RabbitMq or Pubsub.
>>
>>                     In practice, these two capabilities tend to be
>>                     mutually exclusive: some streaming systems can
>>                     provide a serializable CheckpointMark, some can
>>                     do acks, some can do neither - but very few (or
>>                     none) can do both, and it's debatable whether it
>>                     even makes sense for a system to provide both
>>                     capabilities: usually acking is an implicit form
>>                     of streaming-system-side checkpointing, i.e. when
>>                     you re-create the reader you don't actually need
>>                     to carry over any information from an old
>>                     CheckpointMark - the necessary state (which
>>                     records should be delivered) is maintained on the
>>                     streaming system side.
>>
>>                     These two are lumped together into one API simply
>>                     because that was the best design option we came
>>                     up with (not for lack of trying, but suggestions
>>                     very much welcome - AFAIK nobody is happy with it).
>>
>>                     RabbitMQ is under #2 - it can't do serializable
>>                     checkpoint marks, but it can do acks. So you can
>>                     simply ignore the non-serializability.
>>
>>                     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>                     <daniel.robert@acm.org
>>                     <ma...@acm.org>> wrote:
>>
>>                         (Background: I recently upgraded RabbitMqIO
>>                         from the 4.x to 5.x library.
>>                         As part of this I switched to a pull-based
>>                         API rather than the
>>                         previously-used push-based. This has caused
>>                         some nebulous problems so
>>                         put up a correction PR that I think needs
>>                         some eyes fairly quickly as
>>                         I'd consider master to be broken for rabbitmq
>>                         right now. The PR keeps
>>                         the upgrade but reverts to the same
>>                         push-based implementation as in 4.x:
>>                         https://github.com/apache/beam/pull/9977 )
>>
>>                         Regardless, in trying to get the pull-based
>>                         API to work, I'm finding the
>>                         interactions between rabbitmq and beam with
>>                         CheckpointMark to be
>>                         fundamentally impossible to implement so I'm
>>                         hoping for some input here.
>>
>>                         CheckointMark itself must be Serializable,
>>                         presumably this means it gets
>>                         shuffled around between nodes. However
>>                         'Channel', the tunnel through
>>                         which it communicates with Rabbit to ack
>>                         messages and finalize the
>>                         checkpoint, is non-Serializable. Like most
>>                         other CheckpointMark
>>                         implementations, Channel is 'transient'. When
>>                         a new CheckpointMark is
>>                         instantiated, it's given a Channel. If an
>>                         existing one is supplied to
>>                         the Reader's constructor (part of the
>>                         'startReader()' interface), the
>>                         channel is overwritten.
>>
>>                         *However*, Rabbit does not support 'ack'ing
>>                         messages on a channel other
>>                         than the one that consumed them in the first
>>                         place. Attempting to do so
>>                         results in a '406 (PRECONDITION-FAILED) -
>>                         unknown delivery tag'. (See
>>                         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>
>>                         ).
>>
>>                         Truthfully, I don't really understand how the
>>                         current implementation is
>>                         working; it seems like a happy accident. But
>>                         I'm curious if someone
>>                         could help me debug and implement how to
>>                         bridge the
>>                         re-usable/serializable CheckpointMark
>>                         requirement in Beam with this
>>                         limitation of Rabbit.
>>
>>                         Thanks,
>>                         -Daniel Robert
>>
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Reuven Lax <re...@google.com>.
Just a thought: instead of embedding the RabbitMQ streams inside the
checkpoint mark, could you keep a global static map of RabbitMQ streams
keyed by a unique UUID. Then all you have to serialize inside the
CheckpointMark is the UUID; you can look up the actual stream in the
constructor of the CheckpointMark and cache it in a volatile variable that
won't be serialized.

This does mean that if the source shard starts up on a new machine (this
will happen after a crash or if a runner load balances to another machine)
then you cannot recover the same RabbitMQ stream. I presume (hope!) that
RabbitMQ must have some sort ack timeout and will redeliver the messages
after a while. In this case those messages will get "stuck" until RabbitMQ
redelivers them, but will eventually show up again on the new RabbitMQ
stream. (I hope that opening a new stream would not redeliver messages that
had already been successfully acked on the previous stream).

Would this work?

Reuven

On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert <da...@acm.org> wrote:

> We may be talking past each other a bit, though I do appreciate the
> responses.
>
> Rabbit behaves a lot like a relational database in terms of state
> required. A connection is analogous to a database connection, and a channel
> (poor analogy here) is similar to an open transaction. If the connection is
> severed, the transaction will not be able to be committed.
>
> In direct response to the consumer lifecycle linked to, yes, one can
> recover and re-establish connections, but any state maintained within the
> previous channel are lost. If there were messages that had not been
> acknowledged, they would have been re-delivered to some other consumer as
> they were never acknowledged.
>
> "Subscription" isn't really the model in rabbit. It has advantages and
> disadvantages when compared with kafka -- mostly out of scope here -- but
> some quick advantages of the rabbit model: 1) it parallelizes "infinitely"
> without any changes to server (no re-partitioning or the like); 2) messages
> can be acknowledge in a separate order than they were consumed; 3) because
> state is managed associated with an active connection, at-least-once
> delivery semantics are easy to implement as any disconnection will result
> in the messages being re-placed in the queue and delivered to a new
> consumer. To say it's "incompatible with any fault tolerant semantics" is
> unfair, they just aren't incompatible with Beam's, as Beam is currently
> implemented.
>
> Regardless, I'm now wondering what the best path forward is. Rabbit isn't
> unusable in Beam if the set of requirements and tradeoffs are well
> documented. That is, there are use cases that could be properly supported
> and some that likely can't.
>
> One option would be to use a pull-based api and immediately acknowledge
> each message as they arrive. This would effectively make the CheckpointMark
> a no-op, other than maintaining the watermark. In a pipeline that uses
> fixed windows (or non-session windowing) and uses a runner that supports
> 'Drain'-style semantics (like Dataflow) this should work just fine I think.
>
> Another would be to do a best-attempt at acknowledging as late as
> possible. This would be a hybrid approach where we attempt acknowledgements
> in the CheckpointMark, but use a special Coder that acknowledges all
> messages at the point the CheckpointMark is encoded. I think this feels a
> bit unsafe and overly complex, and I'm not sure it solves any real-world
> problems.
>
> I also feel like perhaps we should include Beam IO documentation that
> makes it clear that an unbounded source that requires a persistent
> connection for state tracking is not supportable by beam.
>
> Thanks,
> -Danny
> On 11/14/19 7:49 AM, Jan Lukavský wrote:
>
> Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
> Generally, if RabbitMQ would have no way to recover subscription (which I
> don't think is the case), then it would not be incompatible with beam, but
> actually with would be incompatible any fault tolerant semantics.
>
> [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>
> Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
> <da...@acm.org> <da...@acm.org>:
>
>
> On 11/14/19 2:32 AM, Jan Lukavský wrote:
>
> Hi Danny,
>
> as Eugene pointed out, there are essentially two "modes of operation" of
> CheckpointMark. It can:
>
>  a) be used to somehow restore state of a reader (in call to
> UnboundedSource#createReader)
>
>  b) confirm processed elements in CheckpointMark#finalizeCheckpoint
>
> If your source doesn't provide a persistent position in data stream that
> can be referred to (and serialized - example of this would be kafka
> offsets), then what you actually need to serialize is not the channel, but
> a way how to restore it - e.g. by opening a new channel with a given
> 'consumer group name'. Then you just use this checkpoint to commit your
> processed data in finalizeCheckpoint.
>
> Note that the finalizeCheckpoint is not guaranteed to be called - that can
> happen in cases when an error occurs and the source has to be rewind back -
> that is what direct runner emulates with the probability of
> 'readerReuseChance'.
>
> I'm reading the documentation of RabbitMQ very quickly, but if I
> understand it correctly, then you have to create a subscription to the
> broker, serialize identifier of the subscription into the checkpointmark
> and then just recover the subscription in call to
> UnboundedSource#createReader. That should do the trick.
>
> I have not seen any such documentation in rabbit. My understanding is it
> has to be the same, physical connection and channel. Can you cite the
> source you were looking at?
>
> -Danny
>
> Hope this helps, sorry if I'm not using 100% correct RabbitMQ terminology
> as I said, I'm not quite familiar with it.
>
> Best,
>
>  Jan
> On 11/14/19 5:26 AM, Daniel Robert wrote:
>
> I believe I've nailed down a situation that happens in practice that
> causes Beam and Rabbit to be incompatible. It seems that runners can and do
> make assumptions about the serializability (via Coder) of a CheckpointMark.
>
> To start, these are the semantics of RabbitMQ:
>
> - the client establishes a connection to the server
> - client opens a channel on the connection
> - messages are either pulled or pushed to the client from the server along
> this channel
> - when messages are done processing, they are acknowledged *client-side*
> and must be acknowledged on the *same channel* that originally received the
> message.
>
> Since a channel (or any open connection) is non-serializable, it means
> that a CheckpointMark that has been serialized cannot ever be used to
> acknowledge these messages and correctly 'finalize' the checkpoint. It
> also, as previously discussed in this thread, implies a rabbit Reader
> cannot accept an existing CheckpointMark at all; the Reader and the
> CheckpointMark must share the same connection to the rabbit server
> ("channel").
>
> Next, I've found how DirectRunner (and presumably others) can attempt to
> serialize a CheckpointMark that has not been finalized. In
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
> the DirectRunner applies a probability and if it hits, it sets the current
> reader to 'null' but retains the existing CheckpointMark, which it then
> attempts to pass to a new reader via a Coder.
>
> This puts the shard, the runner, and the reader with differing views of
> the world. In UnboundedReadEvaluatorFactory's processElement function, a
> call to getReader(shard) (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
> ) clones the shard's checkpoint mark and passes that to the new reader. The
> reader ignores it, creating its own, but even if it accepted it, it would
> be accepting a serialized CheckpointMark, which wouldn't work. Later, the
> runner calls finishRead (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
> ). The shard's CheckpointMark (unserialized; which should still be valid)
> is finalized. The reader's CheckpointMark (which may be a different
> instance) becomes the return value, which is referred to as
> "finishedCheckpoint" in the calling code, which is misleading at best and
> problematic at worst as *this* checkpoint has not been finalized.
>
> So, tl;dr: I cannot find any means of maintaining a persistent connection
> to the server for finalizing checkpoints that is safe across runners. If
> there's a guarantee all of the shards are on the same JVM instance, I could
> rely on global, static collections/instances as a workaround, but if other
> runners might serialize this across the wire, I'm stumped. The only
> workable situation I can think of right now is to proactively acknowledge
> messages as they are received and effectively no-op in finalizeCheckpoint.
> This is very different, semantically, and can lead to dropped messages if a
> pipeline doesn't finish processing the given message.
>
> Any help would be much appreciated.
>
> Thanks,
> -Danny
> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
> wrote:
>
> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
> As part of this I switched to a pull-based API rather than the
> previously-used push-based. This has caused some nebulous problems so
> put up a correction PR that I think needs some eyes fairly quickly as
> I'd consider master to be broken for rabbitmq right now. The PR keeps
> the upgrade but reverts to the same push-based implementation as in 4.x:
> https://github.com/apache/beam/pull/9977 )
>
> Regardless, in trying to get the pull-based API to work, I'm finding the
> interactions between rabbitmq and beam with CheckpointMark to be
> fundamentally impossible to implement so I'm hoping for some input here.
>
> CheckointMark itself must be Serializable, presumably this means it gets
> shuffled around between nodes. However 'Channel', the tunnel through
> which it communicates with Rabbit to ack messages and finalize the
> checkpoint, is non-Serializable. Like most other CheckpointMark
> implementations, Channel is 'transient'. When a new CheckpointMark is
> instantiated, it's given a Channel. If an existing one is supplied to
> the Reader's constructor (part of the 'startReader()' interface), the
> channel is overwritten.
>
> *However*, Rabbit does not support 'ack'ing messages on a channel other
> than the one that consumed them in the first place. Attempting to do so
> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>
> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
> ).
>
> Truthfully, I don't really understand how the current implementation is
> working; it seems like a happy accident. But I'm curious if someone
> could help me debug and implement how to bridge the
> re-usable/serializable CheckpointMark requirement in Beam with this
> limitation of Rabbit.
>
> Thanks,
> -Daniel Robert
>
>
>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
We may be talking past each other a bit, though I do appreciate the 
responses.

Rabbit behaves a lot like a relational database in terms of state 
required. A connection is analogous to a database connection, and a 
channel (poor analogy here) is similar to an open transaction. If the 
connection is severed, the transaction will not be able to be committed.

In direct response to the consumer lifecycle linked to, yes, one can 
recover and re-establish connections, but any state maintained within 
the previous channel are lost. If there were messages that had not been 
acknowledged, they would have been re-delivered to some other consumer 
as they were never acknowledged.

"Subscription" isn't really the model in rabbit. It has advantages and 
disadvantages when compared with kafka -- mostly out of scope here -- 
but some quick advantages of the rabbit model: 1) it parallelizes 
"infinitely" without any changes to server (no re-partitioning or the 
like); 2) messages can be acknowledge in a separate order than they were 
consumed; 3) because state is managed associated with an active 
connection, at-least-once delivery semantics are easy to implement as 
any disconnection will result in the messages being re-placed in the 
queue and delivered to a new consumer. To say it's "incompatible with 
any fault tolerant semantics" is unfair, they just aren't incompatible 
with Beam's, as Beam is currently implemented.

Regardless, I'm now wondering what the best path forward is. Rabbit 
isn't unusable in Beam if the set of requirements and tradeoffs are well 
documented. That is, there are use cases that could be properly 
supported and some that likely can't.

One option would be to use a pull-based api and immediately acknowledge 
each message as they arrive. This would effectively make the 
CheckpointMark a no-op, other than maintaining the watermark. In a 
pipeline that uses fixed windows (or non-session windowing) and uses a 
runner that supports 'Drain'-style semantics (like Dataflow) this should 
work just fine I think.

Another would be to do a best-attempt at acknowledging as late as 
possible. This would be a hybrid approach where we attempt 
acknowledgements in the CheckpointMark, but use a special Coder that 
acknowledges all messages at the point the CheckpointMark is encoded. I 
think this feels a bit unsafe and overly complex, and I'm not sure it 
solves any real-world problems.

I also feel like perhaps we should include Beam IO documentation that 
makes it clear that an unbounded source that requires a persistent 
connection for state tracking is not supportable by beam.

Thanks,
-Danny

On 11/14/19 7:49 AM, Jan Lukavský wrote:
> Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
> Generally, if RabbitMQ would have no way to recover subscription 
> (which I don't think is the case), then it would not be incompatible 
> with beam, but actually with would be incompatible any fault tolerant 
> semantics.
>
> [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>
> Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert 
> <da...@acm.org>:
>
>
>     On 11/14/19 2:32 AM, Jan Lukavský wrote:
>
>         Hi Danny,
>
>         as Eugene pointed out, there are essentially two "modes of
>         operation" of CheckpointMark. It can:
>
>          a) be used to somehow restore state of a reader (in call to
>         UnboundedSource#createReader)
>
>          b) confirm processed elements in
>         CheckpointMark#finalizeCheckpoint
>
>         If your source doesn't provide a persistent position in data
>         stream that can be referred to (and serialized - example of
>         this would be kafka offsets), then what you actually need to
>         serialize is not the channel, but a way how to restore it -
>         e.g. by opening a new channel with a given 'consumer group
>         name'. Then you just use this checkpoint to commit your
>         processed data in finalizeCheckpoint.
>
>         Note that the finalizeCheckpoint is not guaranteed to be
>         called - that can happen in cases when an error occurs and the
>         source has to be rewind back - that is what direct runner
>         emulates with the probability of 'readerReuseChance'.
>
>         I'm reading the documentation of RabbitMQ very quickly, but if
>         I understand it correctly, then you have to create a
>         subscription to the broker, serialize identifier of the
>         subscription into the checkpointmark and then just recover the
>         subscription in call to UnboundedSource#createReader. That
>         should do the trick.
>
>     I have not seen any such documentation in rabbit. My understanding
>     is it has to be the same, physical connection and channel. Can you
>     cite the source you were looking at?
>
>     -Danny
>
>         Hope this helps, sorry if I'm not using 100% correct RabbitMQ
>         terminology as I said, I'm not quite familiar with it.
>
>         Best,
>
>          Jan
>
>         On 11/14/19 5:26 AM, Daniel Robert wrote:
>
>             I believe I've nailed down a situation that happens in
>             practice that causes Beam and Rabbit to be incompatible.
>             It seems that runners can and do make assumptions about
>             the serializability (via Coder) of a CheckpointMark.
>
>             To start, these are the semantics of RabbitMQ:
>
>             - the client establishes a connection to the server
>             - client opens a channel on the connection
>             - messages are either pulled or pushed to the client from
>             the server along this channel
>             - when messages are done processing, they are acknowledged
>             *client-side* and must be acknowledged on the *same
>             channel* that originally received the message.
>
>             Since a channel (or any open connection) is
>             non-serializable, it means that a CheckpointMark that has
>             been serialized cannot ever be used to acknowledge these
>             messages and correctly 'finalize' the checkpoint. It also,
>             as previously discussed in this thread, implies a rabbit
>             Reader cannot accept an existing CheckpointMark at all;
>             the Reader and the CheckpointMark must share the same
>             connection to the rabbit server ("channel").
>
>             Next, I've found how DirectRunner (and presumably others)
>             can attempt to serialize a CheckpointMark that has not
>             been finalized. In
>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>             the DirectRunner applies a probability and if it hits, it
>             sets the current reader to 'null' but retains the existing
>             CheckpointMark, which it then attempts to pass to a new
>             reader via a Coder.
>
>             This puts the shard, the runner, and the reader with
>             differing views of the world. In
>             UnboundedReadEvaluatorFactory's processElement function, a
>             call to getReader(shard) (
>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>             ) clones the shard's checkpoint mark and passes that to
>             the new reader. The reader ignores it, creating its own,
>             but even if it accepted it, it would be accepting a
>             serialized CheckpointMark, which wouldn't work. Later, the
>             runner calls finishRead (
>             https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>             ). The shard's CheckpointMark (unserialized; which should
>             still be valid) is finalized. The reader's CheckpointMark
>             (which may be a different instance) becomes the return
>             value, which is referred to as "finishedCheckpoint" in the
>             calling code, which is misleading at best and problematic
>             at worst as *this* checkpoint has not been finalized.
>
>             So, tl;dr: I cannot find any means of maintaining a
>             persistent connection to the server for finalizing
>             checkpoints that is safe across runners. If there's a
>             guarantee all of the shards are on the same JVM instance,
>             I could rely on global, static collections/instances as a
>             workaround, but if other runners might serialize this
>             across the wire, I'm stumped. The only workable situation
>             I can think of right now is to proactively acknowledge
>             messages as they are received and effectively no-op in
>             finalizeCheckpoint. This is very different, semantically,
>             and can lead to dropped messages if a pipeline doesn't
>             finish processing the given message.
>
>             Any help would be much appreciated.
>
>             Thanks,
>             -Danny
>
>             On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>
>                 Hi Daniel,
>
>                 This is probably insufficiently well documented. The
>                 CheckpointMark is used for two purposes:
>                 1) To persistently store some notion of how much of
>                 the stream has been consumed, so that if something
>                 fails we can tell the underlying streaming system
>                 where to start reading when we re-create the reader.
>                 This is why CheckpointMark is Serializable. E.g. this
>                 makes sense for Kafka.
>                 2) To do acks - to let the underlying streaming system
>                 know that the Beam pipeline will never need data up to
>                 this CheckpointMark. Acking does not require
>                 serializability - runners call ack() on the same
>                 in-memory instance of CheckpointMark that was produced
>                 by the reader. E.g. this makes sense for RabbitMq or
>                 Pubsub.
>
>                 In practice, these two capabilities tend to be
>                 mutually exclusive: some streaming systems can provide
>                 a serializable CheckpointMark, some can do acks, some
>                 can do neither - but very few (or none) can do both,
>                 and it's debatable whether it even makes sense for a
>                 system to provide both capabilities: usually acking is
>                 an implicit form of streaming-system-side
>                 checkpointing, i.e. when you re-create the reader you
>                 don't actually need to carry over any information from
>                 an old CheckpointMark - the necessary state (which
>                 records should be delivered) is maintained on the
>                 streaming system side.
>
>                 These two are lumped together into one API simply
>                 because that was the best design option we came up
>                 with (not for lack of trying, but suggestions very
>                 much welcome - AFAIK nobody is happy with it).
>
>                 RabbitMQ is under #2 - it can't do serializable
>                 checkpoint marks, but it can do acks. So you can
>                 simply ignore the non-serializability.
>
>                 On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>                 <daniel.robert@acm.org <ma...@acm.org>>
>                 wrote:
>
>                     (Background: I recently upgraded RabbitMqIO from
>                     the 4.x to 5.x library.
>                     As part of this I switched to a pull-based API
>                     rather than the
>                     previously-used push-based. This has caused some
>                     nebulous problems so
>                     put up a correction PR that I think needs some
>                     eyes fairly quickly as
>                     I'd consider master to be broken for rabbitmq
>                     right now. The PR keeps
>                     the upgrade but reverts to the same push-based
>                     implementation as in 4.x:
>                     https://github.com/apache/beam/pull/9977 )
>
>                     Regardless, in trying to get the pull-based API to
>                     work, I'm finding the
>                     interactions between rabbitmq and beam with
>                     CheckpointMark to be
>                     fundamentally impossible to implement so I'm
>                     hoping for some input here.
>
>                     CheckointMark itself must be Serializable,
>                     presumably this means it gets
>                     shuffled around between nodes. However 'Channel',
>                     the tunnel through
>                     which it communicates with Rabbit to ack messages
>                     and finalize the
>                     checkpoint, is non-Serializable. Like most other
>                     CheckpointMark
>                     implementations, Channel is 'transient'. When a
>                     new CheckpointMark is
>                     instantiated, it's given a Channel. If an existing
>                     one is supplied to
>                     the Reader's constructor (part of the
>                     'startReader()' interface), the
>                     channel is overwritten.
>
>                     *However*, Rabbit does not support 'ack'ing
>                     messages on a channel other
>                     than the one that consumed them in the first
>                     place. Attempting to do so
>                     results in a '406 (PRECONDITION-FAILED) - unknown
>                     delivery tag'. (See
>                     https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>
>                     ).
>
>                     Truthfully, I don't really understand how the
>                     current implementation is
>                     working; it seems like a happy accident. But I'm
>                     curious if someone
>                     could help me debug and implement how to bridge the
>                     re-usable/serializable CheckpointMark requirement
>                     in Beam with this
>                     limitation of Rabbit.
>
>                     Thanks,
>                     -Daniel Robert
>
>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Jan Lukavský <je...@seznam.cz>.
Hi, as I said, I didn't dig too deep into that, but what I saw was [1].

Generally, if RabbitMQ would have no way to recover subscription (which I
don't think is the case), then it would not be incompatible with beam, but
actually with would be incompatible any fault tolerant semantics.  

  

[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

  

Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert <da...@acm.org>:  

>  
>

>

> On 11/14/19 2:32 AM, Jan Lukavský wrote:  
>

>

>> Hi Danny,

>>

>> as Eugene pointed out, there are essentially two "modes of operation" of
CheckpointMark. It can:

>>

>> a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)  
>

>>

>> b) confirm processed elements in CheckpointMark#finalizeCheckpoint

>>

>> If your source doesn't provide a persistent position in data stream that
can be referred to (and serialized - example of this would be kafka offsets),
then what you actually need to serialize is not the channel, but a way how to
restore it - e.g. by opening a new channel with a given 'consumer group name'.
Then you just use this checkpoint to commit your processed data in
finalizeCheckpoint.

>>

>> Note that the finalizeCheckpoint is not guaranteed to be called \- that can
happen in cases when an error occurs and the source has to be rewind back -
that is what direct runner emulates with the probability of
'readerReuseChance'.

>>

>> I'm reading the documentation of RabbitMQ very quickly, but if I understand
it correctly, then you have to create a subscription to the broker, serialize
identifier of the subscription into the checkpointmark and then just recover
the subscription in call to UnboundedSource#createReader. That should do the
trick.

>

> I have not seen any such documentation in rabbit. My understanding is it has
to be the same, physical connection and channel. Can you cite the source you
were looking at?

>

> -Danny  
>

>

>> Hope this helps, sorry if I'm not using 100% correct RabbitMQ terminology
as I said, I'm not quite familiar with it.

>>

>> Best,

>>

>> Jan  
>

>>

>> On 11/14/19 5:26 AM, Daniel Robert wrote:  
>

>>

>>> I believe I've nailed down a situation that happens in practice that
causes Beam and Rabbit to be incompatible. It seems that runners can and do
make assumptions about the serializability (via Coder) of a CheckpointMark.

>>>

>>> To start, these are the semantics of RabbitMQ:

>>>

>>> \- the client establishes a connection to the server  
>  \- client opens a channel on the connection  
>  \- messages are either pulled or pushed to the client from the server along
this channel  
>  \- when messages are done processing, they are acknowledged *client-side*
and must be acknowledged on the *same channel* that originally received the
message.

>>>

>>> Since a channel (or any open connection) is non-serializable, it means
that a CheckpointMark that has been serialized cannot ever be used to
acknowledge these messages and correctly 'finalize' the checkpoint. It also,
as previously discussed in this thread, implies a rabbit Reader cannot accept
an existing CheckpointMark at all; the Reader and the CheckpointMark must
share the same connection to the rabbit server ("channel").  
>

>>>

>>> Next, I've found how DirectRunner (and presumably others) can attempt to
serialize a CheckpointMark that has not been finalized. In
<https://github.com/apache/beam/blob/master/runners/direct-
java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150>,
the DirectRunner applies a probability and if it hits, it sets the current
reader to 'null' but retains the existing CheckpointMark, which it then
attempts to pass to a new reader via a Coder.  
>

>>>

>>> This puts the shard, the runner, and the reader with differing views of
the world. In UnboundedReadEvaluatorFactory's processElement function, a call
to getReader(shard) (
<https://github.com/apache/beam/blob/master/runners/direct-
java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132>
) clones the shard's checkpoint mark and passes that to the new reader. The
reader ignores it, creating its own, but even if it accepted it, it would be
accepting a serialized CheckpointMark, which wouldn't work. Later, the runner
calls finishRead ( <https://github.com/apache/beam/blob/master/runners/direct-
java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246>
). The shard's CheckpointMark (unserialized; which should still be valid) is
finalized. The reader's CheckpointMark (which may be a different instance)
becomes the return value, which is referred to as "finishedCheckpoint" in the
calling code, which is misleading at best and problematic at worst as *this*
checkpoint has not been finalized.

>>>

>>> So, tl;dr: I cannot find any means of maintaining a persistent connection
to the server for finalizing checkpoints that is safe across runners. If
there's a guarantee all of the shards are on the same JVM instance, I could
rely on global, static collections/instances as a workaround, but if other
runners might serialize this across the wire, I'm stumped. The only workable
situation I can think of right now is to proactively acknowledge messages as
they are received and effectively no-op in finalizeCheckpoint. This is very
different, semantically, and can lead to dropped messages if a pipeline
doesn't finish processing the given message.  
>

>>>

>>> Any help would be much appreciated.

>>>

>>> Thanks,  
>  -Danny  
>

>>>

>>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:  
>

>>>

>>>> Hi Daniel,

>>>>

>>>>  
>

>>>>

>>>> This is probably insufficiently well documented. The CheckpointMark is
used for two purposes:

>>>>

>>>> 1) To persistently store some notion of how much of the stream has been
consumed, so that if something fails we can tell the underlying streaming
system where to start reading when we re-create the reader. This is why
CheckpointMark is Serializable. E.g. this makes sense for Kafka.

>>>>

>>>> 2) To do acks - to let the underlying streaming system know that the Beam
pipeline will never need data up to this CheckpointMark. Acking does not
require serializability - runners call ack() on the same in-memory instance of
CheckpointMark that was produced by the reader. E.g. this makes sense for
RabbitMq or Pubsub.

>>>>

>>>>  
>

>>>>

>>>> In practice, these two capabilities tend to be mutually exclusive: some
streaming systems can provide a serializable CheckpointMark, some can do acks,
some can do neither - but very few (or none) can do both, and it's debatable
whether it even makes sense for a system to provide both capabilities: usually
acking is an implicit form of streaming-system-side checkpointing, i.e. when
you re-create the reader you don't actually need to carry over any information
from an old CheckpointMark - the necessary state (which records should be
delivered) is maintained on the streaming system side.

>>>>

>>>>  
>

>>>>

>>>> These two are lumped together into one API simply because that was the
best design option we came up with (not for lack of trying, but suggestions
very much welcome \- AFAIK nobody is happy with it).

>>>>

>>>>  
>

>>>>

>>>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
can do acks. So you can simply ignore the non-serializability.

>>>>

>>>>  
>

>>>>

>>>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
<[daniel.robert@acm.org](mailto:daniel.robert@acm.org)> wrote:  
>

>>>>

>>>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.  
>  As part of this I switched to a pull-based API rather than the  
>  previously-used push-based. This has caused some nebulous problems so  
>  put up a correction PR that I think needs some eyes fairly quickly as  
>  I'd consider master to be broken for rabbitmq right now. The PR keeps  
>  the upgrade but reverts to the same push-based implementation as in 4.x:  
>  <https://github.com/apache/beam/pull/9977> )  
>  
>  Regardless, in trying to get the pull-based API to work, I'm finding the  
>  interactions between rabbitmq and beam with CheckpointMark to be  
>  fundamentally impossible to implement so I'm hoping for some input here.  
>  
>  CheckointMark itself must be Serializable, presumably this means it gets  
>  shuffled around between nodes. However 'Channel', the tunnel through  
>  which it communicates with Rabbit to ack messages and finalize the  
>  checkpoint, is non-Serializable. Like most other CheckpointMark  
>  implementations, Channel is 'transient'. When a new CheckpointMark is  
>  instantiated, it's given a Channel. If an existing one is supplied to  
>  the Reader's constructor (part of the 'startReader()' interface), the  
>  channel is overwritten.  
>  
>  *However*, Rabbit does not support 'ack'ing messages on a channel other  
>  than the one that consumed them in the first place. Attempting to do so  
>  results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See  
>  <https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-
failed>  
>  ).  
>  
>  Truthfully, I don't really understand how the current implementation is  
>  working; it seems like a happy accident. But I'm curious if someone  
>  could help me debug and implement how to bridge the  
>  re-usable/serializable CheckpointMark requirement in Beam with this  
>  limitation of Rabbit.  
>  
>  Thanks,  
>  -Daniel Robert  
>  
>

  


Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
On 11/14/19 2:32 AM, Jan Lukavský wrote:
>
> Hi Danny,
>
> as Eugene pointed out, there are essentially two "modes of operation" 
> of CheckpointMark. It can:
>
>  a) be used to somehow restore state of a reader (in call to 
> UnboundedSource#createReader)
>
>  b) confirm processed elements in CheckpointMark#finalizeCheckpoint
>
> If your source doesn't provide a persistent position in data stream 
> that can be referred to (and serialized - example of this would be 
> kafka offsets), then what you actually need to serialize is not the 
> channel, but a way how to restore it - e.g. by opening a new channel 
> with a given 'consumer group name'. Then you just use this checkpoint 
> to commit your processed data in finalizeCheckpoint.
>
> Note that the finalizeCheckpoint is not guaranteed to be called - that 
> can happen in cases when an error occurs and the source has to be 
> rewind back - that is what direct runner emulates with the probability 
> of 'readerReuseChance'.
>
> I'm reading the documentation of RabbitMQ very quickly, but if I 
> understand it correctly, then you have to create a subscription to the 
> broker, serialize identifier of the subscription into the 
> checkpointmark and then just recover the subscription in call to 
> UnboundedSource#createReader. That should do the trick.
>
I have not seen any such documentation in rabbit. My understanding is it 
has to be the same, physical connection and channel. Can you cite the 
source you were looking at?

-Danny

> Hope this helps, sorry if I'm not using 100% correct RabbitMQ 
> terminology as I said, I'm not quite familiar with it.
>
> Best,
>
>  Jan
>
> On 11/14/19 5:26 AM, Daniel Robert wrote:
>>
>> I believe I've nailed down a situation that happens in practice that 
>> causes Beam and Rabbit to be incompatible. It seems that runners can 
>> and do make assumptions about the serializability (via Coder) of a 
>> CheckpointMark.
>>
>> To start, these are the semantics of RabbitMQ:
>>
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server 
>> along this channel
>> - when messages are done processing, they are acknowledged 
>> *client-side* and must be acknowledged on the *same channel* that 
>> originally received the message.
>>
>> Since a channel (or any open connection) is non-serializable, it 
>> means that a CheckpointMark that has been serialized cannot ever be 
>> used to acknowledge these messages and correctly 'finalize' the 
>> checkpoint. It also, as previously discussed in this thread, implies 
>> a rabbit Reader cannot accept an existing CheckpointMark at all; the 
>> Reader and the CheckpointMark must share the same connection to the 
>> rabbit server ("channel").
>>
>> Next, I've found how DirectRunner (and presumably others) can attempt 
>> to serialize a CheckpointMark that has not been finalized. In 
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
>> the DirectRunner applies a probability and if it hits, it sets the 
>> current reader to 'null' but retains the existing CheckpointMark, 
>> which it then attempts to pass to a new reader via a Coder.
>>
>> This puts the shard, the runner, and the reader with differing views 
>> of the world. In UnboundedReadEvaluatorFactory's processElement 
>> function, a call to getReader(shard) ( 
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
>> ) clones the shard's checkpoint mark and passes that to the new 
>> reader. The reader ignores it, creating its own, but even if it 
>> accepted it, it would be accepting a serialized CheckpointMark, which 
>> wouldn't work. Later, the runner calls finishRead ( 
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
>> ). The shard's CheckpointMark (unserialized; which should still be 
>> valid) is finalized. The reader's CheckpointMark (which may be a 
>> different instance) becomes the return value, which is referred to as 
>> "finishedCheckpoint" in the calling code, which is misleading at best 
>> and problematic at worst as *this* checkpoint has not been finalized.
>>
>> So, tl;dr: I cannot find any means of maintaining a persistent 
>> connection to the server for finalizing checkpoints that is safe 
>> across runners. If there's a guarantee all of the shards are on the 
>> same JVM instance, I could rely on global, static 
>> collections/instances as a workaround, but if other runners might 
>> serialize this across the wire, I'm stumped. The only workable 
>> situation I can think of right now is to proactively acknowledge 
>> messages as they are received and effectively no-op in 
>> finalizeCheckpoint. This is very different, semantically, and can 
>> lead to dropped messages if a pipeline doesn't finish processing the 
>> given message.
>>
>> Any help would be much appreciated.
>>
>> Thanks,
>> -Danny
>>
>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>> Hi Daniel,
>>>
>>> This is probably insufficiently well documented. The CheckpointMark 
>>> is used for two purposes:
>>> 1) To persistently store some notion of how much of the stream has 
>>> been consumed, so that if something fails we can tell the underlying 
>>> streaming system where to start reading when we re-create the 
>>> reader. This is why CheckpointMark is Serializable. E.g. this makes 
>>> sense for Kafka.
>>> 2) To do acks - to let the underlying streaming system know that the 
>>> Beam pipeline will never need data up to this CheckpointMark. Acking 
>>> does not require serializability - runners call ack() on the same 
>>> in-memory instance of CheckpointMark that was produced by the 
>>> reader. E.g. this makes sense for RabbitMq or Pubsub.
>>>
>>> In practice, these two capabilities tend to be mutually exclusive: 
>>> some streaming systems can provide a serializable CheckpointMark, 
>>> some can do acks, some can do neither - but very few (or none) can 
>>> do both, and it's debatable whether it even makes sense for a system 
>>> to provide both capabilities: usually acking is an implicit form of 
>>> streaming-system-side checkpointing, i.e. when you re-create the 
>>> reader you don't actually need to carry over any information from an 
>>> old CheckpointMark - the necessary state (which records should be 
>>> delivered) is maintained on the streaming system side.
>>>
>>> These two are lumped together into one API simply because that was 
>>> the best design option we came up with (not for lack of trying, but 
>>> suggestions very much welcome - AFAIK nobody is happy with it).
>>>
>>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, 
>>> but it can do acks. So you can simply ignore the non-serializability.
>>>
>>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.robert@acm.org 
>>> <ma...@acm.org>> wrote:
>>>
>>>     (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
>>>     library.
>>>     As part of this I switched to a pull-based API rather than the
>>>     previously-used push-based. This has caused some nebulous
>>>     problems so
>>>     put up a correction PR that I think needs some eyes fairly
>>>     quickly as
>>>     I'd consider master to be broken for rabbitmq right now. The PR
>>>     keeps
>>>     the upgrade but reverts to the same push-based implementation as
>>>     in 4.x:
>>>     https://github.com/apache/beam/pull/9977 )
>>>
>>>     Regardless, in trying to get the pull-based API to work, I'm
>>>     finding the
>>>     interactions between rabbitmq and beam with CheckpointMark to be
>>>     fundamentally impossible to implement so I'm hoping for some
>>>     input here.
>>>
>>>     CheckointMark itself must be Serializable, presumably this means
>>>     it gets
>>>     shuffled around between nodes. However 'Channel', the tunnel
>>>     through
>>>     which it communicates with Rabbit to ack messages and finalize the
>>>     checkpoint, is non-Serializable. Like most other CheckpointMark
>>>     implementations, Channel is 'transient'. When a new
>>>     CheckpointMark is
>>>     instantiated, it's given a Channel. If an existing one is
>>>     supplied to
>>>     the Reader's constructor (part of the 'startReader()'
>>>     interface), the
>>>     channel is overwritten.
>>>
>>>     *However*, Rabbit does not support 'ack'ing messages on a
>>>     channel other
>>>     than the one that consumed them in the first place. Attempting
>>>     to do so
>>>     results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'.
>>>     (See
>>>     https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>>
>>>     ).
>>>
>>>     Truthfully, I don't really understand how the current
>>>     implementation is
>>>     working; it seems like a happy accident. But I'm curious if someone
>>>     could help me debug and implement how to bridge the
>>>     re-usable/serializable CheckpointMark requirement in Beam with this
>>>     limitation of Rabbit.
>>>
>>>     Thanks,
>>>     -Daniel Robert
>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Danny,

as Eugene pointed out, there are essentially two "modes of operation" of 
CheckpointMark. It can:

  a) be used to somehow restore state of a reader (in call to 
UnboundedSource#createReader)

  b) confirm processed elements in CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data stream that 
can be referred to (and serialized - example of this would be kafka 
offsets), then what you actually need to serialize is not the channel, 
but a way how to restore it - e.g. by opening a new channel with a given 
'consumer group name'. Then you just use this checkpoint to commit your 
processed data in finalizeCheckpoint.

Note that the finalizeCheckpoint is not guaranteed to be called - that 
can happen in cases when an error occurs and the source has to be rewind 
back - that is what direct runner emulates with the probability of 
'readerReuseChance'.

I'm reading the documentation of RabbitMQ very quickly, but if I 
understand it correctly, then you have to create a subscription to the 
broker, serialize identifier of the subscription into the checkpointmark 
and then just recover the subscription in call to 
UnboundedSource#createReader. That should do the trick.

Hope this helps, sorry if I'm not using 100% correct RabbitMQ 
terminology as I said, I'm not quite familiar with it.

Best,

  Jan

On 11/14/19 5:26 AM, Daniel Robert wrote:
>
> I believe I've nailed down a situation that happens in practice that 
> causes Beam and Rabbit to be incompatible. It seems that runners can 
> and do make assumptions about the serializability (via Coder) of a 
> CheckpointMark.
>
> To start, these are the semantics of RabbitMQ:
>
> - the client establishes a connection to the server
> - client opens a channel on the connection
> - messages are either pulled or pushed to the client from the server 
> along this channel
> - when messages are done processing, they are acknowledged 
> *client-side* and must be acknowledged on the *same channel* that 
> originally received the message.
>
> Since a channel (or any open connection) is non-serializable, it means 
> that a CheckpointMark that has been serialized cannot ever be used to 
> acknowledge these messages and correctly 'finalize' the checkpoint. It 
> also, as previously discussed in this thread, implies a rabbit Reader 
> cannot accept an existing CheckpointMark at all; the Reader and the 
> CheckpointMark must share the same connection to the rabbit server 
> ("channel").
>
> Next, I've found how DirectRunner (and presumably others) can attempt 
> to serialize a CheckpointMark that has not been finalized. In 
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
> the DirectRunner applies a probability and if it hits, it sets the 
> current reader to 'null' but retains the existing CheckpointMark, 
> which it then attempts to pass to a new reader via a Coder.
>
> This puts the shard, the runner, and the reader with differing views 
> of the world. In UnboundedReadEvaluatorFactory's processElement 
> function, a call to getReader(shard) ( 
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
> ) clones the shard's checkpoint mark and passes that to the new 
> reader. The reader ignores it, creating its own, but even if it 
> accepted it, it would be accepting a serialized CheckpointMark, which 
> wouldn't work. Later, the runner calls finishRead ( 
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
> ). The shard's CheckpointMark (unserialized; which should still be 
> valid) is finalized. The reader's CheckpointMark (which may be a 
> different instance) becomes the return value, which is referred to as 
> "finishedCheckpoint" in the calling code, which is misleading at best 
> and problematic at worst as *this* checkpoint has not been finalized.
>
> So, tl;dr: I cannot find any means of maintaining a persistent 
> connection to the server for finalizing checkpoints that is safe 
> across runners. If there's a guarantee all of the shards are on the 
> same JVM instance, I could rely on global, static 
> collections/instances as a workaround, but if other runners might 
> serialize this across the wire, I'm stumped. The only workable 
> situation I can think of right now is to proactively acknowledge 
> messages as they are received and effectively no-op in 
> finalizeCheckpoint. This is very different, semantically, and can lead 
> to dropped messages if a pipeline doesn't finish processing the given 
> message.
>
> Any help would be much appreciated.
>
> Thanks,
> -Danny
>
> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark 
>> is used for two purposes:
>> 1) To persistently store some notion of how much of the stream has 
>> been consumed, so that if something fails we can tell the underlying 
>> streaming system where to start reading when we re-create the reader. 
>> This is why CheckpointMark is Serializable. E.g. this makes sense for 
>> Kafka.
>> 2) To do acks - to let the underlying streaming system know that the 
>> Beam pipeline will never need data up to this CheckpointMark. Acking 
>> does not require serializability - runners call ack() on the same 
>> in-memory instance of CheckpointMark that was produced by the reader. 
>> E.g. this makes sense for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: 
>> some streaming systems can provide a serializable CheckpointMark, 
>> some can do acks, some can do neither - but very few (or none) can do 
>> both, and it's debatable whether it even makes sense for a system to 
>> provide both capabilities: usually acking is an implicit form of 
>> streaming-system-side checkpointing, i.e. when you re-create the 
>> reader you don't actually need to carry over any information from an 
>> old CheckpointMark - the necessary state (which records should be 
>> delivered) is maintained on the streaming system side.
>>
>> These two are lumped together into one API simply because that was 
>> the best design option we came up with (not for lack of trying, but 
>> suggestions very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but 
>> it can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.robert@acm.org 
>> <ma...@acm.org>> wrote:
>>
>>     (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
>>     library.
>>     As part of this I switched to a pull-based API rather than the
>>     previously-used push-based. This has caused some nebulous
>>     problems so
>>     put up a correction PR that I think needs some eyes fairly
>>     quickly as
>>     I'd consider master to be broken for rabbitmq right now. The PR
>>     keeps
>>     the upgrade but reverts to the same push-based implementation as
>>     in 4.x:
>>     https://github.com/apache/beam/pull/9977 )
>>
>>     Regardless, in trying to get the pull-based API to work, I'm
>>     finding the
>>     interactions between rabbitmq and beam with CheckpointMark to be
>>     fundamentally impossible to implement so I'm hoping for some
>>     input here.
>>
>>     CheckointMark itself must be Serializable, presumably this means
>>     it gets
>>     shuffled around between nodes. However 'Channel', the tunnel through
>>     which it communicates with Rabbit to ack messages and finalize the
>>     checkpoint, is non-Serializable. Like most other CheckpointMark
>>     implementations, Channel is 'transient'. When a new
>>     CheckpointMark is
>>     instantiated, it's given a Channel. If an existing one is
>>     supplied to
>>     the Reader's constructor (part of the 'startReader()' interface),
>>     the
>>     channel is overwritten.
>>
>>     *However*, Rabbit does not support 'ack'ing messages on a channel
>>     other
>>     than the one that consumed them in the first place. Attempting to
>>     do so
>>     results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'.
>>     (See
>>     https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>
>>     ).
>>
>>     Truthfully, I don't really understand how the current
>>     implementation is
>>     working; it seems like a happy accident. But I'm curious if someone
>>     could help me debug and implement how to bridge the
>>     re-usable/serializable CheckpointMark requirement in Beam with this
>>     limitation of Rabbit.
>>
>>     Thanks,
>>     -Daniel Robert
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Reuven Lax <re...@google.com>.
Immediately after a source, the window is the Global window, which means
you will get global deduplication.

On Thu, Nov 14, 2019 at 12:50 PM Daniel Robert <da...@acm.org>
wrote:

> Alright, thanks everybody. I'm really appreciative of the conversation
> here. I think I see where my disconnect is and how this might all work
> together for me. There are some bugs in the current rabbit implementation
> that I think have confused my understanding of the intended semantics. I'm
> coming around to seeing how such a system with rabbit's restrictions can
> work properly in Beam (I'd totally forgotten about 'dedupe' support in
> Beam) but I want to clarify some implementation questions after pulling
> everyone's notes together.
>
> RabbitMQ reader should not bother accepting an existing CheckpointMark in
> its constructor (in 'ack-based' systems this is unnecessary per Eugene's
> original reply). It should construct its own CheckpointMark at construction
> time and use it throughout its lifecycle.
>
> At some point later, the CheckpointMark will be 'finalized'. If this
> CheckpointMark has been Serialized (via Coder or otherwise) or its
> underlying connection has been severed, this step will fail. This would
> mean at some point the messages are redelivered to Beam on some other
> Reader, so no data loss. If it has not been serialized, the acks will take
> place just fine, even if much later.
>
> If the system is using processing-time as event-time, however, the
> redelivery of these messages would effectively change the ordering and
> potentially the window they arrived in. I *believe* that Beam deduping
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and
> fails) would these messages appear in a new window?
>
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the
> API dictates such a thing?
> - is there a provision that would need to be made for processing-time
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm
> nervous redelivered messages would appear in another window)
> - What is the relationship lifecycle-wise between a CheckpointMark and a
> Reader? My understanding is a CheckpointMark may outlive a Reader, is that
> correct?
>
> Thanks for bearing with me everyone. It feels a bit unfortunate my first
> foray into beam is reliant on this rabbit connector but I'm learning a lot
> and I'm very grateful for the help. PRs pending once I get this all
> straightened out in my head.
>
> -Danny
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
>
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <da...@acm.org>
> wrote:
>
>> I believe I've nailed down a situation that happens in practice that
>> causes Beam and Rabbit to be incompatible. It seems that runners can and do
>> make assumptions about the serializability (via Coder) of a CheckpointMark.
>>
>> To start, these are the semantics of RabbitMQ:
>>
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server
>> along this channel
>> - when messages are done processing, they are acknowledged *client-side*
>> and must be acknowledged on the *same channel* that originally received the
>> message.
>>
>> Since a channel (or any open connection) is non-serializable, it means
>> that a CheckpointMark that has been serialized cannot ever be used to
>> acknowledge these messages and correctly 'finalize' the checkpoint. It
>> also, as previously discussed in this thread, implies a rabbit Reader
>> cannot accept an existing CheckpointMark at all; the Reader and the
>> CheckpointMark must share the same connection to the rabbit server
>> ("channel").
>>
> This is correct.
>
>
>> Next, I've found how DirectRunner (and presumably others) can attempt to
>> serialize a CheckpointMark that has not been finalized. In
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>> the DirectRunner applies a probability and if it hits, it sets the current
>> reader to 'null' but retains the existing CheckpointMark, which it then
>> attempts to pass to a new reader via a Coder.
>>
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of previous
> CheckpointMarks, obtained a new one and serialized it so things can be
> restored in case of failure
> - A failure happened before the current CheckpointMark could be finalized,
> which means Beam was not able to guarantee that elements after the
> last-finalized mark have been durably processed, so we may need to re-read
> them, so runner recreates a reader from the current mark.
>
>
>> This puts the shard, the runner, and the reader with differing views of
>> the world. In UnboundedReadEvaluatorFactory's processElement function, a
>> call to getReader(shard) (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>> ) clones the shard's checkpoint mark and passes that to the new reader. The
>> reader ignores it, creating its own, but even if it accepted it, it would
>> be accepting a serialized CheckpointMark, which wouldn't work.
>>
> Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
> affect what the reader will read: it depends only on the broker's internal
> state (which in turn depends on which messages have been acked by previous
> finalized CheckpointMark's).
>
>> Later, the runner calls finishRead (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>> ). The shard's CheckpointMark (unserialized; which should still be valid)
>> is finalized. The reader's CheckpointMark (which may be a different
>> instance) becomes the return value, which is referred to as
>> "finishedCheckpoint" in the calling code, which is misleading at best and
>> problematic at worst as *this* checkpoint has not been finalized.
>>
> I'm not following what is the problem here. In that code, "oldMark" is the
> last checkpoint mark to be finalized - calling finalizeCheckpoint on it
> signals that Beam has durably processed all the messages read from the
> reader until that mark. "mark" (the new one) represents the state of the
> reader after the last finalized mark, so it should not be finalized.
>
> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
> things go like this:
>
> Create a reader
> Let mark M1 = reader.getCheckpointMark()
> Durably persist M1 as the "restore point" of this reader
> ...read messages A B C from reader and durably process them...
> Finalize M1 (acks A B C)
> Let mark M2 = reader.getCheckpointMark()
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E F and durably process them...
> Finalize M2 (acks D E F)
>
> Now let's imagine a failure.
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E, and then a failure happens
> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
> Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
> will redeliver them to this reader. D E will be processed twice, but only
> the results of this new processing will be durable.
> Finalize M2 (acks D E F)
> Etc.
>
> Basically you can think of this as a series of micro-bundles, where
> micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
> runner-side transaction which either commits or discards the results of
> processing all messages in this micro-bundle. After a micro-bundle [M1, M2)
> commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as
> the new restore point in case of failure.
>
>> So, tl;dr: I cannot find any means of maintaining a persistent connection
>> to the server for finalizing checkpoints that is safe across runners. If
>> there's a guarantee all of the shards are on the same JVM instance, I could
>> rely on global, static collections/instances as a workaround, but if other
>> runners might serialize this across the wire, I'm stumped. The only
>> workable situation I can think of right now is to proactively acknowledge
>> messages as they are received and effectively no-op in finalizeCheckpoint.
>> This is very different, semantically, and can lead to dropped messages if a
>> pipeline doesn't finish processing the given message.
>>
>> Any help would be much appreciated.
>>
> If I'm misunderstanding something above, could you describe in detail a
> scenario that leads to message loss, or (less severe) to more-than-once
> durable processing of the same message?
>
>
>> Thanks,
>> -Danny
>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert
>>>
>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Kenneth Knowles <ke...@apache.org>.
I just took a look at the PR - it is, indeed, huge. But it is probably not
too hard to review as it is mostly fresh code. It is true that there hasn't
been a ton of work on RabbitMQ so maybe the reviewer isn't obvious. There's
3 committers on this thread who seem to have the expertise and interest...

Kenn

On Mon, Jan 6, 2020 at 2:25 PM Daniel Robert <da...@acm.org> wrote:

> Alright, a bit late but this took me a while.
>
> Thanks for all the input so far. I have rewritten much of the RabbitMq IO
> connector and have it ready to go in a draft pr:
> https://github.com/apache/beam/pull/10509
>
> This should incorporate a lot of what's been discussed here, in terms of
> watermarking, serialization, error handling, etc. It also clarifies/cleans
> up a lot of very confusing documentation/api settings pertaining to using
> 'queues vs exchanges' and adds clarifying documentation on various valid
> AMQP paradigms.
>
> Watermarking/timestamp management is mostly stolen from KafkaIO and
> modified as appropriate.
>
> This also does a lot to improve resource management in terms of Connection
> and Channel usage, largely modeled after JdbcIO's ConnectionHandlerProvider
> concept.
>
> I'm not entirely sure how best to proceed from here, hence the email. It's
> a huge PR, but it has no specific backing ticket (it should), and
> historically there haven't been many eyes on RabbitMq PRs.
>
> Thanks,
> -Danny
> On 11/14/19 4:13 PM, Jan Lukavský wrote:
>
> On 11/14/19 9:50 PM, Daniel Robert wrote:
>
> Alright, thanks everybody. I'm really appreciative of the conversation
> here. I think I see where my disconnect is and how this might all work
> together for me. There are some bugs in the current rabbit implementation
> that I think have confused my understanding of the intended semantics. I'm
> coming around to seeing how such a system with rabbit's restrictions can
> work properly in Beam (I'd totally forgotten about 'dedupe' support in
> Beam) but I want to clarify some implementation questions after pulling
> everyone's notes together.
>
> RabbitMQ reader should not bother accepting an existing CheckpointMark in
> its constructor (in 'ack-based' systems this is unnecessary per Eugene's
> original reply). It should construct its own CheckpointMark at construction
> time and use it throughout its lifecycle.
>
> At some point later, the CheckpointMark will be 'finalized'. If this
> CheckpointMark has been Serialized (via Coder or otherwise) or its
> underlying connection has been severed, this step will fail. This would
> mean at some point the messages are redelivered to Beam on some other
> Reader, so no data loss. If it has not been serialized, the acks will take
> place just fine, even if much later.
>
> If the system is using processing-time as event-time, however, the
> redelivery of these messages would effectively change the ordering and
> potentially the window they arrived in. I *believe* that Beam deduping
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and
> fails) would these messages appear in a new window?
>
> This is very much likely to happen with any source, if it would assign
> something like *now* to event time. That is ill defined and if the source
> cannot provide some retry-persistent estimate of real event-time, than I'd
> suggest to force user to specify an UDF to extract event time from the
> payload. Everything else would probably break (at least if any
> timestamp-related windowing would be used in the pipeline).
>
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
>
> An exception thrown should fail the checkpoint and therefore retry
> everything from the last checkpoint.
>
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the
> API dictates such a thing?
>
> See above.
>
> - is there a provision that would need to be made for processing-time
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm
> nervous redelivered messages would appear in another window)
>
> You are nervous for a reason. :) I strongly believe processing time source
> should be considered anti-pattern, at least in situations where there is
> any time manipulation downstream (time-windows, stateful processing, ...).
>
> - What is the relationship lifecycle-wise between a CheckpointMark and a
> Reader? My understanding is a CheckpointMark may outlive a Reader, is that
> correct?
>
> Definitely. But the same instance bound to the lifecycle of the reader
> would be used to finalizeCheckpoint (if that ever happens).
>
> Thanks for bearing with me everyone. It feels a bit unfortunate my first
> foray into beam is reliant on this rabbit connector but I'm learning a lot
> and I'm very grateful for the help. PRs pending once I get this all
> straightened out in my head.
>
> -Danny
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
>
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <da...@acm.org>
> wrote:
>
>> I believe I've nailed down a situation that happens in practice that
>> causes Beam and Rabbit to be incompatible. It seems that runners can and do
>> make assumptions about the serializability (via Coder) of a CheckpointMark.
>>
>> To start, these are the semantics of RabbitMQ:
>>
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server
>> along this channel
>> - when messages are done processing, they are acknowledged *client-side*
>> and must be acknowledged on the *same channel* that originally received the
>> message.
>>
>> Since a channel (or any open connection) is non-serializable, it means
>> that a CheckpointMark that has been serialized cannot ever be used to
>> acknowledge these messages and correctly 'finalize' the checkpoint. It
>> also, as previously discussed in this thread, implies a rabbit Reader
>> cannot accept an existing CheckpointMark at all; the Reader and the
>> CheckpointMark must share the same connection to the rabbit server
>> ("channel").
>>
> This is correct.
>
>
>> Next, I've found how DirectRunner (and presumably others) can attempt to
>> serialize a CheckpointMark that has not been finalized. In
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>> the DirectRunner applies a probability and if it hits, it sets the current
>> reader to 'null' but retains the existing CheckpointMark, which it then
>> attempts to pass to a new reader via a Coder.
>>
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of previous
> CheckpointMarks, obtained a new one and serialized it so things can be
> restored in case of failure
> - A failure happened before the current CheckpointMark could be finalized,
> which means Beam was not able to guarantee that elements after the
> last-finalized mark have been durably processed, so we may need to re-read
> them, so runner recreates a reader from the current mark.
>
>
>> This puts the shard, the runner, and the reader with differing views of
>> the world. In UnboundedReadEvaluatorFactory's processElement function, a
>> call to getReader(shard) (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>> ) clones the shard's checkpoint mark and passes that to the new reader. The
>> reader ignores it, creating its own, but even if it accepted it, it would
>> be accepting a serialized CheckpointMark, which wouldn't work.
>>
> Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
> affect what the reader will read: it depends only on the broker's internal
> state (which in turn depends on which messages have been acked by previous
> finalized CheckpointMark's).
>
>> Later, the runner calls finishRead (
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>> ). The shard's CheckpointMark (unserialized; which should still be valid)
>> is finalized. The reader's CheckpointMark (which may be a different
>> instance) becomes the return value, which is referred to as
>> "finishedCheckpoint" in the calling code, which is misleading at best and
>> problematic at worst as *this* checkpoint has not been finalized.
>>
> I'm not following what is the problem here. In that code, "oldMark" is the
> last checkpoint mark to be finalized - calling finalizeCheckpoint on it
> signals that Beam has durably processed all the messages read from the
> reader until that mark. "mark" (the new one) represents the state of the
> reader after the last finalized mark, so it should not be finalized.
>
> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
> things go like this:
>
> Create a reader
> Let mark M1 = reader.getCheckpointMark()
> Durably persist M1 as the "restore point" of this reader
> ...read messages A B C from reader and durably process them...
> Finalize M1 (acks A B C)
> Let mark M2 = reader.getCheckpointMark()
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E F and durably process them...
> Finalize M2 (acks D E F)
>
> Now let's imagine a failure.
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E, and then a failure happens
> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
> Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
> will redeliver them to this reader. D E will be processed twice, but only
> the results of this new processing will be durable.
> Finalize M2 (acks D E F)
> Etc.
>
> Basically you can think of this as a series of micro-bundles, where
> micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
> runner-side transaction which either commits or discards the results of
> processing all messages in this micro-bundle. After a micro-bundle [M1, M2)
> commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as
> the new restore point in case of failure.
>
>> So, tl;dr: I cannot find any means of maintaining a persistent connection
>> to the server for finalizing checkpoints that is safe across runners. If
>> there's a guarantee all of the shards are on the same JVM instance, I could
>> rely on global, static collections/instances as a workaround, but if other
>> runners might serialize this across the wire, I'm stumped. The only
>> workable situation I can think of right now is to proactively acknowledge
>> messages as they are received and effectively no-op in finalizeCheckpoint.
>> This is very different, semantically, and can lead to dropped messages if a
>> pipeline doesn't finish processing the given message.
>>
>> Any help would be much appreciated.
>>
> If I'm misunderstanding something above, could you describe in detail a
> scenario that leads to message loss, or (less severe) to more-than-once
> durable processing of the same message?
>
>
>> Thanks,
>> -Danny
>> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert
>>>
>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
Alright, a bit late but this took me a while.

Thanks for all the input so far. I have rewritten much of the RabbitMq 
IO connector and have it ready to go in a draft pr: 
https://github.com/apache/beam/pull/10509

This should incorporate a lot of what's been discussed here, in terms of 
watermarking, serialization, error handling, etc. It also 
clarifies/cleans up a lot of very confusing documentation/api settings 
pertaining to using 'queues vs exchanges' and adds clarifying 
documentation on various valid AMQP paradigms.

Watermarking/timestamp management is mostly stolen from KafkaIO and 
modified as appropriate.

This also does a lot to improve resource management in terms of 
Connection and Channel usage, largely modeled after JdbcIO's 
ConnectionHandlerProvider concept.

I'm not entirely sure how best to proceed from here, hence the email. 
It's a huge PR, but it has no specific backing ticket (it should), and 
historically there haven't been many eyes on RabbitMq PRs.

Thanks,
-Danny

On 11/14/19 4:13 PM, Jan Lukavský wrote:
>
> On 11/14/19 9:50 PM, Daniel Robert wrote:
>
>> Alright, thanks everybody. I'm really appreciative of the 
>> conversation here. I think I see where my disconnect is and how this 
>> might all work together for me. There are some bugs in the current 
>> rabbit implementation that I think have confused my understanding of 
>> the intended semantics. I'm coming around to seeing how such a system 
>> with rabbit's restrictions can work properly in Beam (I'd totally 
>> forgotten about 'dedupe' support in Beam) but I want to clarify some 
>> implementation questions after pulling everyone's notes together.
>>
>> RabbitMQ reader should not bother accepting an existing 
>> CheckpointMark in its constructor (in 'ack-based' systems this is 
>> unnecessary per Eugene's original reply). It should construct its own 
>> CheckpointMark at construction time and use it throughout its lifecycle.
>>
>> At some point later, the CheckpointMark will be 'finalized'. If this 
>> CheckpointMark has been Serialized (via Coder or otherwise) or its 
>> underlying connection has been severed, this step will fail. This 
>> would mean at some point the messages are redelivered to Beam on some 
>> other Reader, so no data loss. If it has not been serialized, the 
>> acks will take place just fine, even if much later.
>>
>> If the system is using processing-time as event-time, however, the 
>> redelivery of these messages would effectively change the ordering 
>> and potentially the window they arrived in. I *believe* that Beam 
>> deduping seems to be managed per-window so if 'finalizeCheckpoint' is 
>> attempted (and fails) would these messages appear in a new window?
>>
> This is very much likely to happen with any source, if it would assign 
> something like *now* to event time. That is ill defined and if the 
> source cannot provide some retry-persistent estimate of real 
> event-time, than I'd suggest to force user to specify an UDF to 
> extract event time from the payload. Everything else would probably 
> break (at least if any timestamp-related windowing would be used in 
> the pipeline).
>>
>> Perhaps my question are now:
>> - how should a CheckpointMark should communicate failure to the Beam
>>
> An exception thrown should fail the checkpoint and therefore retry 
> everything from the last checkpoint.
>>
>> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, 
>> if the API dictates such a thing?
>>
> See above.
>>
>> - is there a provision that would need to be made for processing-time 
>> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
>> nervous redelivered messages would appear in another window)
>>
> You are nervous for a reason. :) I strongly believe processing time 
> source should be considered anti-pattern, at least in situations where 
> there is any time manipulation downstream (time-windows, stateful 
> processing, ...).
>>
>> - What is the relationship lifecycle-wise between a CheckpointMark 
>> and a Reader? My understanding is a CheckpointMark may outlive a 
>> Reader, is that correct?
>>
> Definitely. But the same instance bound to the lifecycle of the reader 
> would be used to finalizeCheckpoint (if that ever happens).
>>
>> Thanks for bearing with me everyone. It feels a bit unfortunate my 
>> first foray into beam is reliant on this rabbit connector but I'm 
>> learning a lot and I'm very grateful for the help. PRs pending once I 
>> get this all straightened out in my head.
>>
>> -Danny
>>
>> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>>> Hi Daniel,
>>>
>>>
>>> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.robert@acm.org 
>>> <ma...@acm.org>> wrote:
>>>
>>>     I believe I've nailed down a situation that happens in practice
>>>     that causes Beam and Rabbit to be incompatible. It seems that
>>>     runners can and do make assumptions about the serializability
>>>     (via Coder) of a CheckpointMark.
>>>
>>>     To start, these are the semantics of RabbitMQ:
>>>
>>>     - the client establishes a connection to the server
>>>     - client opens a channel on the connection
>>>     - messages are either pulled or pushed to the client from the
>>>     server along this channel
>>>     - when messages are done processing, they are acknowledged
>>>     *client-side* and must be acknowledged on the *same channel*
>>>     that originally received the message.
>>>
>>>     Since a channel (or any open connection) is non-serializable, it
>>>     means that a CheckpointMark that has been serialized cannot ever
>>>     be used to acknowledge these messages and correctly 'finalize'
>>>     the checkpoint. It also, as previously discussed in this thread,
>>>     implies a rabbit Reader cannot accept an existing CheckpointMark
>>>     at all; the Reader and the CheckpointMark must share the same
>>>     connection to the rabbit server ("channel").
>>>
>>> This is correct.
>>>
>>>     Next, I've found how DirectRunner (and presumably others) can
>>>     attempt to serialize a CheckpointMark that has not been
>>>     finalized. In
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>>>     the DirectRunner applies a probability and if it hits, it sets
>>>     the current reader to 'null' but retains the existing
>>>     CheckpointMark, which it then attempts to pass to a new reader
>>>     via a Coder.
>>>
>>> Correct, this simulates a failure scenario:
>>> - Runner was reading the source and, after finalizing a bunch of 
>>> previous CheckpointMarks, obtained a new one and serialized it so 
>>> things can be restored in case of failure
>>> - A failure happened before the current CheckpointMark could be 
>>> finalized, which means Beam was not able to guarantee that elements 
>>> after the last-finalized mark have been durably processed, so we may 
>>> need to re-read them, so runner recreates a reader from the current 
>>> mark.
>>>
>>>     This puts the shard, the runner, and the reader with differing
>>>     views of the world. In UnboundedReadEvaluatorFactory's
>>>     processElement function, a call to getReader(shard) (
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>>>     ) clones the shard's checkpoint mark and passes that to the new
>>>     reader. The reader ignores it, creating its own, but even if it
>>>     accepted it, it would be accepting a serialized CheckpointMark,
>>>     which wouldn't work.
>>>
>>> Correct in the sense that for a RabbitMQ reader, a CheckpointMark 
>>> doesn't affect what the reader will read: it depends only on the 
>>> broker's internal state (which in turn depends on which messages 
>>> have been acked by previous finalized CheckpointMark's).
>>>
>>>     Later, the runner calls finishRead (
>>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>>>     ). The shard's CheckpointMark (unserialized; which should still
>>>     be valid) is finalized. The reader's CheckpointMark (which may
>>>     be a different instance) becomes the return value, which is
>>>     referred to as "finishedCheckpoint" in the calling code, which
>>>     is misleading at best and problematic at worst as *this*
>>>     checkpoint has not been finalized.
>>>
>>> I'm not following what is the problem here. In that code, "oldMark" 
>>> is the last checkpoint mark to be finalized - calling 
>>> finalizeCheckpoint on it signals that Beam has durably processed all 
>>> the messages read from the reader until that mark. "mark" (the new 
>>> one) represents the state of the reader after the last finalized 
>>> mark, so it should not be finalized.
>>>
>>> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to 
>>> emulate) things go like this:
>>>
>>> Create a reader
>>> Let mark M1 = reader.getCheckpointMark()
>>> Durably persist M1 as the "restore point" of this reader
>>> ...read messages A B C from reader and durably process them...
>>> Finalize M1 (acks A B C)
>>> Let mark M2 = reader.getCheckpointMark()
>>> Durably persist M2 as the "restore point" of this reader
>>> ...read messages D E F and durably process them...
>>> Finalize M2 (acks D E F)
>>>
>>> Now let's imagine a failure.
>>> Durably persist M2 as the "restore point" of this reader
>>> ...read messages D E, and then a failure happens
>>> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
>>> Since M2 was not finalized, messages D E F were not acked, and 
>>> RabbitMQ will redeliver them to this reader. D E will be processed 
>>> twice, but only the results of this new processing will be durable.
>>> Finalize M2 (acks D E F)
>>> Etc.
>>> Basically you can think of this as a series of micro-bundles, where 
>>> micro-bundles are delimited by checkpoint marks, and each 
>>> micro-bundle is a runner-side transaction which either commits or 
>>> discards the results of processing all messages in this 
>>> micro-bundle. After a micro-bundle [M1, M2) commits, the runner 
>>> calls M1.finalizeCheckpointMark() and persists M2 as the new restore 
>>> point in case of failure.
>>>
>>>     So, tl;dr: I cannot find any means of maintaining a persistent
>>>     connection to the server for finalizing checkpoints that is safe
>>>     across runners. If there's a guarantee all of the shards are on
>>>     the same JVM instance, I could rely on global, static
>>>     collections/instances as a workaround, but if other runners
>>>     might serialize this across the wire, I'm stumped. The only
>>>     workable situation I can think of right now is to proactively
>>>     acknowledge messages as they are received and effectively no-op
>>>     in finalizeCheckpoint. This is very different, semantically, and
>>>     can lead to dropped messages if a pipeline doesn't finish
>>>     processing the given message.
>>>
>>>     Any help would be much appreciated.
>>>
>>> If I'm misunderstanding something above, could you describe in 
>>> detail a scenario that leads to message loss, or (less severe) to 
>>> more-than-once durable processing of the same message?
>>>
>>>     Thanks,
>>>     -Danny
>>>
>>>     On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>>>     Hi Daniel,
>>>>
>>>>     This is probably insufficiently well documented. The
>>>>     CheckpointMark is used for two purposes:
>>>>     1) To persistently store some notion of how much of the stream
>>>>     has been consumed, so that if something fails we can tell the
>>>>     underlying streaming system where to start reading when we
>>>>     re-create the reader. This is why CheckpointMark is
>>>>     Serializable. E.g. this makes sense for Kafka.
>>>>     2) To do acks - to let the underlying streaming system know
>>>>     that the Beam pipeline will never need data up to this
>>>>     CheckpointMark. Acking does not require serializability -
>>>>     runners call ack() on the same in-memory instance of
>>>>     CheckpointMark that was produced by the reader. E.g. this makes
>>>>     sense for RabbitMq or Pubsub.
>>>>
>>>>     In practice, these two capabilities tend to be mutually
>>>>     exclusive: some streaming systems can provide a serializable
>>>>     CheckpointMark, some can do acks, some can do neither - but
>>>>     very few (or none) can do both, and it's debatable whether it
>>>>     even makes sense for a system to provide both capabilities:
>>>>     usually acking is an implicit form of streaming-system-side
>>>>     checkpointing, i.e. when you re-create the reader you don't
>>>>     actually need to carry over any information from an old
>>>>     CheckpointMark - the necessary state (which records should be
>>>>     delivered) is maintained on the streaming system side.
>>>>
>>>>     These two are lumped together into one API simply because that
>>>>     was the best design option we came up with (not for lack of
>>>>     trying, but suggestions very much welcome - AFAIK nobody is
>>>>     happy with it).
>>>>
>>>>     RabbitMQ is under #2 - it can't do serializable checkpoint
>>>>     marks, but it can do acks. So you can simply ignore the
>>>>     non-serializability.
>>>>
>>>>     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>>>     <daniel.robert@acm.org <ma...@acm.org>> wrote:
>>>>
>>>>         (Background: I recently upgraded RabbitMqIO from the 4.x to
>>>>         5.x library.
>>>>         As part of this I switched to a pull-based API rather than the
>>>>         previously-used push-based. This has caused some nebulous
>>>>         problems so
>>>>         put up a correction PR that I think needs some eyes fairly
>>>>         quickly as
>>>>         I'd consider master to be broken for rabbitmq right now.
>>>>         The PR keeps
>>>>         the upgrade but reverts to the same push-based
>>>>         implementation as in 4.x:
>>>>         https://github.com/apache/beam/pull/9977 )
>>>>
>>>>         Regardless, in trying to get the pull-based API to work,
>>>>         I'm finding the
>>>>         interactions between rabbitmq and beam with CheckpointMark
>>>>         to be
>>>>         fundamentally impossible to implement so I'm hoping for
>>>>         some input here.
>>>>
>>>>         CheckointMark itself must be Serializable, presumably this
>>>>         means it gets
>>>>         shuffled around between nodes. However 'Channel', the
>>>>         tunnel through
>>>>         which it communicates with Rabbit to ack messages and
>>>>         finalize the
>>>>         checkpoint, is non-Serializable. Like most other
>>>>         CheckpointMark
>>>>         implementations, Channel is 'transient'. When a new
>>>>         CheckpointMark is
>>>>         instantiated, it's given a Channel. If an existing one is
>>>>         supplied to
>>>>         the Reader's constructor (part of the 'startReader()'
>>>>         interface), the
>>>>         channel is overwritten.
>>>>
>>>>         *However*, Rabbit does not support 'ack'ing messages on a
>>>>         channel other
>>>>         than the one that consumed them in the first place.
>>>>         Attempting to do so
>>>>         results in a '406 (PRECONDITION-FAILED) - unknown delivery
>>>>         tag'. (See
>>>>         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>>>
>>>>         ).
>>>>
>>>>         Truthfully, I don't really understand how the current
>>>>         implementation is
>>>>         working; it seems like a happy accident. But I'm curious if
>>>>         someone
>>>>         could help me debug and implement how to bridge the
>>>>         re-usable/serializable CheckpointMark requirement in Beam
>>>>         with this
>>>>         limitation of Rabbit.
>>>>
>>>>         Thanks,
>>>>         -Daniel Robert
>>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Jan Lukavský <je...@seznam.cz>.
On 11/14/19 9:50 PM, Daniel Robert wrote:

> Alright, thanks everybody. I'm really appreciative of the conversation 
> here. I think I see where my disconnect is and how this might all work 
> together for me. There are some bugs in the current rabbit 
> implementation that I think have confused my understanding of the 
> intended semantics. I'm coming around to seeing how such a system with 
> rabbit's restrictions can work properly in Beam (I'd totally forgotten 
> about 'dedupe' support in Beam) but I want to clarify some 
> implementation questions after pulling everyone's notes together.
>
> RabbitMQ reader should not bother accepting an existing CheckpointMark 
> in its constructor (in 'ack-based' systems this is unnecessary per 
> Eugene's original reply). It should construct its own CheckpointMark 
> at construction time and use it throughout its lifecycle.
>
> At some point later, the CheckpointMark will be 'finalized'. If this 
> CheckpointMark has been Serialized (via Coder or otherwise) or its 
> underlying connection has been severed, this step will fail. This 
> would mean at some point the messages are redelivered to Beam on some 
> other Reader, so no data loss. If it has not been serialized, the acks 
> will take place just fine, even if much later.
>
> If the system is using processing-time as event-time, however, the 
> redelivery of these messages would effectively change the ordering and 
> potentially the window they arrived in. I *believe* that Beam deduping 
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted 
> (and fails) would these messages appear in a new window?
>
This is very much likely to happen with any source, if it would assign 
something like *now* to event time. That is ill defined and if the 
source cannot provide some retry-persistent estimate of real event-time, 
than I'd suggest to force user to specify an UDF to extract event time 
from the payload. Everything else would probably break (at least if any 
timestamp-related windowing would be used in the pipeline).
>
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
>
An exception thrown should fail the checkpoint and therefore retry 
everything from the last checkpoint.
>
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if 
> the API dictates such a thing?
>
See above.
>
> - is there a provision that would need to be made for processing-time 
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
> nervous redelivered messages would appear in another window)
>
You are nervous for a reason. :) I strongly believe processing time 
source should be considered anti-pattern, at least in situations where 
there is any time manipulation downstream (time-windows, stateful 
processing, ...).
>
> - What is the relationship lifecycle-wise between a CheckpointMark and 
> a Reader? My understanding is a CheckpointMark may outlive a Reader, 
> is that correct?
>
Definitely. But the same instance bound to the lifecycle of the reader 
would be used to finalizeCheckpoint (if that ever happens).
>
> Thanks for bearing with me everyone. It feels a bit unfortunate my 
> first foray into beam is reliant on this rabbit connector but I'm 
> learning a lot and I'm very grateful for the help. PRs pending once I 
> get this all straightened out in my head.
>
> -Danny
>
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>> Hi Daniel,
>>
>>
>> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.robert@acm.org 
>> <ma...@acm.org>> wrote:
>>
>>     I believe I've nailed down a situation that happens in practice
>>     that causes Beam and Rabbit to be incompatible. It seems that
>>     runners can and do make assumptions about the serializability
>>     (via Coder) of a CheckpointMark.
>>
>>     To start, these are the semantics of RabbitMQ:
>>
>>     - the client establishes a connection to the server
>>     - client opens a channel on the connection
>>     - messages are either pulled or pushed to the client from the
>>     server along this channel
>>     - when messages are done processing, they are acknowledged
>>     *client-side* and must be acknowledged on the *same channel* that
>>     originally received the message.
>>
>>     Since a channel (or any open connection) is non-serializable, it
>>     means that a CheckpointMark that has been serialized cannot ever
>>     be used to acknowledge these messages and correctly 'finalize'
>>     the checkpoint. It also, as previously discussed in this thread,
>>     implies a rabbit Reader cannot accept an existing CheckpointMark
>>     at all; the Reader and the CheckpointMark must share the same
>>     connection to the rabbit server ("channel").
>>
>> This is correct.
>>
>>     Next, I've found how DirectRunner (and presumably others) can
>>     attempt to serialize a CheckpointMark that has not been
>>     finalized. In
>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>>     the DirectRunner applies a probability and if it hits, it sets
>>     the current reader to 'null' but retains the existing
>>     CheckpointMark, which it then attempts to pass to a new reader
>>     via a Coder.
>>
>> Correct, this simulates a failure scenario:
>> - Runner was reading the source and, after finalizing a bunch of 
>> previous CheckpointMarks, obtained a new one and serialized it so 
>> things can be restored in case of failure
>> - A failure happened before the current CheckpointMark could be 
>> finalized, which means Beam was not able to guarantee that elements 
>> after the last-finalized mark have been durably processed, so we may 
>> need to re-read them, so runner recreates a reader from the current mark.
>>
>>     This puts the shard, the runner, and the reader with differing
>>     views of the world. In UnboundedReadEvaluatorFactory's
>>     processElement function, a call to getReader(shard) (
>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>>     ) clones the shard's checkpoint mark and passes that to the new
>>     reader. The reader ignores it, creating its own, but even if it
>>     accepted it, it would be accepting a serialized CheckpointMark,
>>     which wouldn't work.
>>
>> Correct in the sense that for a RabbitMQ reader, a CheckpointMark 
>> doesn't affect what the reader will read: it depends only on the 
>> broker's internal state (which in turn depends on which messages have 
>> been acked by previous finalized CheckpointMark's).
>>
>>     Later, the runner calls finishRead (
>>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>>     ). The shard's CheckpointMark (unserialized; which should still
>>     be valid) is finalized. The reader's CheckpointMark (which may be
>>     a different instance) becomes the return value, which is referred
>>     to as "finishedCheckpoint" in the calling code, which is
>>     misleading at best and problematic at worst as *this* checkpoint
>>     has not been finalized.
>>
>> I'm not following what is the problem here. In that code, "oldMark" 
>> is the last checkpoint mark to be finalized - calling 
>> finalizeCheckpoint on it signals that Beam has durably processed all 
>> the messages read from the reader until that mark. "mark" (the new 
>> one) represents the state of the reader after the last finalized 
>> mark, so it should not be finalized.
>>
>> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to 
>> emulate) things go like this:
>>
>> Create a reader
>> Let mark M1 = reader.getCheckpointMark()
>> Durably persist M1 as the "restore point" of this reader
>> ...read messages A B C from reader and durably process them...
>> Finalize M1 (acks A B C)
>> Let mark M2 = reader.getCheckpointMark()
>> Durably persist M2 as the "restore point" of this reader
>> ...read messages D E F and durably process them...
>> Finalize M2 (acks D E F)
>>
>> Now let's imagine a failure.
>> Durably persist M2 as the "restore point" of this reader
>> ...read messages D E, and then a failure happens
>> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
>> Since M2 was not finalized, messages D E F were not acked, and 
>> RabbitMQ will redeliver them to this reader. D E will be processed 
>> twice, but only the results of this new processing will be durable.
>> Finalize M2 (acks D E F)
>> Etc.
>> Basically you can think of this as a series of micro-bundles, where 
>> micro-bundles are delimited by checkpoint marks, and each 
>> micro-bundle is a runner-side transaction which either commits or 
>> discards the results of processing all messages in this micro-bundle. 
>> After a micro-bundle [M1, M2) commits, the runner calls 
>> M1.finalizeCheckpointMark() and persists M2 as the new restore point 
>> in case of failure.
>>
>>     So, tl;dr: I cannot find any means of maintaining a persistent
>>     connection to the server for finalizing checkpoints that is safe
>>     across runners. If there's a guarantee all of the shards are on
>>     the same JVM instance, I could rely on global, static
>>     collections/instances as a workaround, but if other runners might
>>     serialize this across the wire, I'm stumped. The only workable
>>     situation I can think of right now is to proactively acknowledge
>>     messages as they are received and effectively no-op in
>>     finalizeCheckpoint. This is very different, semantically, and can
>>     lead to dropped messages if a pipeline doesn't finish processing
>>     the given message.
>>
>>     Any help would be much appreciated.
>>
>> If I'm misunderstanding something above, could you describe in detail 
>> a scenario that leads to message loss, or (less severe) to 
>> more-than-once durable processing of the same message?
>>
>>     Thanks,
>>     -Danny
>>
>>     On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>>     Hi Daniel,
>>>
>>>     This is probably insufficiently well documented. The
>>>     CheckpointMark is used for two purposes:
>>>     1) To persistently store some notion of how much of the stream
>>>     has been consumed, so that if something fails we can tell the
>>>     underlying streaming system where to start reading when we
>>>     re-create the reader. This is why CheckpointMark is
>>>     Serializable. E.g. this makes sense for Kafka.
>>>     2) To do acks - to let the underlying streaming system know that
>>>     the Beam pipeline will never need data up to this
>>>     CheckpointMark. Acking does not require serializability -
>>>     runners call ack() on the same in-memory instance of
>>>     CheckpointMark that was produced by the reader. E.g. this makes
>>>     sense for RabbitMq or Pubsub.
>>>
>>>     In practice, these two capabilities tend to be mutually
>>>     exclusive: some streaming systems can provide a serializable
>>>     CheckpointMark, some can do acks, some can do neither - but very
>>>     few (or none) can do both, and it's debatable whether it even
>>>     makes sense for a system to provide both capabilities: usually
>>>     acking is an implicit form of streaming-system-side
>>>     checkpointing, i.e. when you re-create the reader you don't
>>>     actually need to carry over any information from an old
>>>     CheckpointMark - the necessary state (which records should be
>>>     delivered) is maintained on the streaming system side.
>>>
>>>     These two are lumped together into one API simply because that
>>>     was the best design option we came up with (not for lack of
>>>     trying, but suggestions very much welcome - AFAIK nobody is
>>>     happy with it).
>>>
>>>     RabbitMQ is under #2 - it can't do serializable checkpoint
>>>     marks, but it can do acks. So you can simply ignore the
>>>     non-serializability.
>>>
>>>     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>>     <daniel.robert@acm.org <ma...@acm.org>> wrote:
>>>
>>>         (Background: I recently upgraded RabbitMqIO from the 4.x to
>>>         5.x library.
>>>         As part of this I switched to a pull-based API rather than the
>>>         previously-used push-based. This has caused some nebulous
>>>         problems so
>>>         put up a correction PR that I think needs some eyes fairly
>>>         quickly as
>>>         I'd consider master to be broken for rabbitmq right now. The
>>>         PR keeps
>>>         the upgrade but reverts to the same push-based
>>>         implementation as in 4.x:
>>>         https://github.com/apache/beam/pull/9977 )
>>>
>>>         Regardless, in trying to get the pull-based API to work, I'm
>>>         finding the
>>>         interactions between rabbitmq and beam with CheckpointMark
>>>         to be
>>>         fundamentally impossible to implement so I'm hoping for some
>>>         input here.
>>>
>>>         CheckointMark itself must be Serializable, presumably this
>>>         means it gets
>>>         shuffled around between nodes. However 'Channel', the tunnel
>>>         through
>>>         which it communicates with Rabbit to ack messages and
>>>         finalize the
>>>         checkpoint, is non-Serializable. Like most other CheckpointMark
>>>         implementations, Channel is 'transient'. When a new
>>>         CheckpointMark is
>>>         instantiated, it's given a Channel. If an existing one is
>>>         supplied to
>>>         the Reader's constructor (part of the 'startReader()'
>>>         interface), the
>>>         channel is overwritten.
>>>
>>>         *However*, Rabbit does not support 'ack'ing messages on a
>>>         channel other
>>>         than the one that consumed them in the first place.
>>>         Attempting to do so
>>>         results in a '406 (PRECONDITION-FAILED) - unknown delivery
>>>         tag'. (See
>>>         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>>
>>>         ).
>>>
>>>         Truthfully, I don't really understand how the current
>>>         implementation is
>>>         working; it seems like a happy accident. But I'm curious if
>>>         someone
>>>         could help me debug and implement how to bridge the
>>>         re-usable/serializable CheckpointMark requirement in Beam
>>>         with this
>>>         limitation of Rabbit.
>>>
>>>         Thanks,
>>>         -Daniel Robert
>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
Alright, thanks everybody. I'm really appreciative of the conversation 
here. I think I see where my disconnect is and how this might all work 
together for me. There are some bugs in the current rabbit 
implementation that I think have confused my understanding of the 
intended semantics. I'm coming around to seeing how such a system with 
rabbit's restrictions can work properly in Beam (I'd totally forgotten 
about 'dedupe' support in Beam) but I want to clarify some 
implementation questions after pulling everyone's notes together.

RabbitMQ reader should not bother accepting an existing CheckpointMark 
in its constructor (in 'ack-based' systems this is unnecessary per 
Eugene's original reply). It should construct its own CheckpointMark at 
construction time and use it throughout its lifecycle.

At some point later, the CheckpointMark will be 'finalized'. If this 
CheckpointMark has been Serialized (via Coder or otherwise) or its 
underlying connection has been severed, this step will fail. This would 
mean at some point the messages are redelivered to Beam on some other 
Reader, so no data loss. If it has not been serialized, the acks will 
take place just fine, even if much later.

If the system is using processing-time as event-time, however, the 
redelivery of these messages would effectively change the ordering and 
potentially the window they arrived in. I *believe* that Beam deduping 
seems to be managed per-window so if 'finalizeCheckpoint' is attempted 
(and fails) would these messages appear in a new window?

Perhaps my question are now:
- how should a CheckpointMark should communicate failure to the Beam
- how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if 
the API dictates such a thing?
- is there a provision that would need to be made for processing-time 
sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
nervous redelivered messages would appear in another window)
- What is the relationship lifecycle-wise between a CheckpointMark and a 
Reader? My understanding is a CheckpointMark may outlive a Reader, is 
that correct?

Thanks for bearing with me everyone. It feels a bit unfortunate my first 
foray into beam is reliant on this rabbit connector but I'm learning a 
lot and I'm very grateful for the help. PRs pending once I get this all 
straightened out in my head.

-Danny

On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
> Hi Daniel,
>
>
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <daniel.robert@acm.org 
> <ma...@acm.org>> wrote:
>
>     I believe I've nailed down a situation that happens in practice
>     that causes Beam and Rabbit to be incompatible. It seems that
>     runners can and do make assumptions about the serializability (via
>     Coder) of a CheckpointMark.
>
>     To start, these are the semantics of RabbitMQ:
>
>     - the client establishes a connection to the server
>     - client opens a channel on the connection
>     - messages are either pulled or pushed to the client from the
>     server along this channel
>     - when messages are done processing, they are acknowledged
>     *client-side* and must be acknowledged on the *same channel* that
>     originally received the message.
>
>     Since a channel (or any open connection) is non-serializable, it
>     means that a CheckpointMark that has been serialized cannot ever
>     be used to acknowledge these messages and correctly 'finalize' the
>     checkpoint. It also, as previously discussed in this thread,
>     implies a rabbit Reader cannot accept an existing CheckpointMark
>     at all; the Reader and the CheckpointMark must share the same
>     connection to the rabbit server ("channel").
>
> This is correct.
>
>     Next, I've found how DirectRunner (and presumably others) can
>     attempt to serialize a CheckpointMark that has not been finalized.
>     In
>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>     the DirectRunner applies a probability and if it hits, it sets the
>     current reader to 'null' but retains the existing CheckpointMark,
>     which it then attempts to pass to a new reader via a Coder.
>
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of 
> previous CheckpointMarks, obtained a new one and serialized it so 
> things can be restored in case of failure
> - A failure happened before the current CheckpointMark could be 
> finalized, which means Beam was not able to guarantee that elements 
> after the last-finalized mark have been durably processed, so we may 
> need to re-read them, so runner recreates a reader from the current mark.
>
>     This puts the shard, the runner, and the reader with differing
>     views of the world. In UnboundedReadEvaluatorFactory's
>     processElement function, a call to getReader(shard) (
>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
>     ) clones the shard's checkpoint mark and passes that to the new
>     reader. The reader ignores it, creating its own, but even if it
>     accepted it, it would be accepting a serialized CheckpointMark,
>     which wouldn't work.
>
> Correct in the sense that for a RabbitMQ reader, a CheckpointMark 
> doesn't affect what the reader will read: it depends only on the 
> broker's internal state (which in turn depends on which messages have 
> been acked by previous finalized CheckpointMark's).
>
>     Later, the runner calls finishRead (
>     https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
>     ). The shard's CheckpointMark (unserialized; which should still be
>     valid) is finalized. The reader's CheckpointMark (which may be a
>     different instance) becomes the return value, which is referred to
>     as "finishedCheckpoint" in the calling code, which is misleading
>     at best and problematic at worst as *this* checkpoint has not been
>     finalized.
>
> I'm not following what is the problem here. In that code, "oldMark" is 
> the last checkpoint mark to be finalized - calling finalizeCheckpoint 
> on it signals that Beam has durably processed all the messages read 
> from the reader until that mark. "mark" (the new one) represents the 
> state of the reader after the last finalized mark, so it should not be 
> finalized.
>
> I.e. AFAIR in a hypothetical runner (which DirectRunner tries to 
> emulate) things go like this:
>
> Create a reader
> Let mark M1 = reader.getCheckpointMark()
> Durably persist M1 as the "restore point" of this reader
> ...read messages A B C from reader and durably process them...
> Finalize M1 (acks A B C)
> Let mark M2 = reader.getCheckpointMark()
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E F and durably process them...
> Finalize M2 (acks D E F)
>
> Now let's imagine a failure.
> Durably persist M2 as the "restore point" of this reader
> ...read messages D E, and then a failure happens
> Recreate reader from M2 (reader ignores M2 but it doesn't matter)
> Since M2 was not finalized, messages D E F were not acked, and 
> RabbitMQ will redeliver them to this reader. D E will be processed 
> twice, but only the results of this new processing will be durable.
> Finalize M2 (acks D E F)
> Etc.
> Basically you can think of this as a series of micro-bundles, where 
> micro-bundles are delimited by checkpoint marks, and each micro-bundle 
> is a runner-side transaction which either commits or discards the 
> results of processing all messages in this micro-bundle. After a 
> micro-bundle [M1, M2) commits, the runner calls 
> M1.finalizeCheckpointMark() and persists M2 as the new restore point 
> in case of failure.
>
>     So, tl;dr: I cannot find any means of maintaining a persistent
>     connection to the server for finalizing checkpoints that is safe
>     across runners. If there's a guarantee all of the shards are on
>     the same JVM instance, I could rely on global, static
>     collections/instances as a workaround, but if other runners might
>     serialize this across the wire, I'm stumped. The only workable
>     situation I can think of right now is to proactively acknowledge
>     messages as they are received and effectively no-op in
>     finalizeCheckpoint. This is very different, semantically, and can
>     lead to dropped messages if a pipeline doesn't finish processing
>     the given message.
>
>     Any help would be much appreciated.
>
> If I'm misunderstanding something above, could you describe in detail 
> a scenario that leads to message loss, or (less severe) to 
> more-than-once durable processing of the same message?
>
>     Thanks,
>     -Danny
>
>     On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>>     Hi Daniel,
>>
>>     This is probably insufficiently well documented. The
>>     CheckpointMark is used for two purposes:
>>     1) To persistently store some notion of how much of the stream
>>     has been consumed, so that if something fails we can tell the
>>     underlying streaming system where to start reading when we
>>     re-create the reader. This is why CheckpointMark is Serializable.
>>     E.g. this makes sense for Kafka.
>>     2) To do acks - to let the underlying streaming system know that
>>     the Beam pipeline will never need data up to this CheckpointMark.
>>     Acking does not require serializability - runners call ack() on
>>     the same in-memory instance of CheckpointMark that was produced
>>     by the reader. E.g. this makes sense for RabbitMq or Pubsub.
>>
>>     In practice, these two capabilities tend to be mutually
>>     exclusive: some streaming systems can provide a serializable
>>     CheckpointMark, some can do acks, some can do neither - but very
>>     few (or none) can do both, and it's debatable whether it even
>>     makes sense for a system to provide both capabilities: usually
>>     acking is an implicit form of streaming-system-side
>>     checkpointing, i.e. when you re-create the reader you don't
>>     actually need to carry over any information from an old
>>     CheckpointMark - the necessary state (which records should be
>>     delivered) is maintained on the streaming system side.
>>
>>     These two are lumped together into one API simply because that
>>     was the best design option we came up with (not for lack of
>>     trying, but suggestions very much welcome - AFAIK nobody is happy
>>     with it).
>>
>>     RabbitMQ is under #2 - it can't do serializable checkpoint marks,
>>     but it can do acks. So you can simply ignore the non-serializability.
>>
>>     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>>     <daniel.robert@acm.org <ma...@acm.org>> wrote:
>>
>>         (Background: I recently upgraded RabbitMqIO from the 4.x to
>>         5.x library.
>>         As part of this I switched to a pull-based API rather than the
>>         previously-used push-based. This has caused some nebulous
>>         problems so
>>         put up a correction PR that I think needs some eyes fairly
>>         quickly as
>>         I'd consider master to be broken for rabbitmq right now. The
>>         PR keeps
>>         the upgrade but reverts to the same push-based implementation
>>         as in 4.x:
>>         https://github.com/apache/beam/pull/9977 )
>>
>>         Regardless, in trying to get the pull-based API to work, I'm
>>         finding the
>>         interactions between rabbitmq and beam with CheckpointMark to be
>>         fundamentally impossible to implement so I'm hoping for some
>>         input here.
>>
>>         CheckointMark itself must be Serializable, presumably this
>>         means it gets
>>         shuffled around between nodes. However 'Channel', the tunnel
>>         through
>>         which it communicates with Rabbit to ack messages and
>>         finalize the
>>         checkpoint, is non-Serializable. Like most other CheckpointMark
>>         implementations, Channel is 'transient'. When a new
>>         CheckpointMark is
>>         instantiated, it's given a Channel. If an existing one is
>>         supplied to
>>         the Reader's constructor (part of the 'startReader()'
>>         interface), the
>>         channel is overwritten.
>>
>>         *However*, Rabbit does not support 'ack'ing messages on a
>>         channel other
>>         than the one that consumed them in the first place.
>>         Attempting to do so
>>         results in a '406 (PRECONDITION-FAILED) - unknown delivery
>>         tag'. (See
>>         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>
>>         ).
>>
>>         Truthfully, I don't really understand how the current
>>         implementation is
>>         working; it seems like a happy accident. But I'm curious if
>>         someone
>>         could help me debug and implement how to bridge the
>>         re-usable/serializable CheckpointMark requirement in Beam
>>         with this
>>         limitation of Rabbit.
>>
>>         Thanks,
>>         -Daniel Robert
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Eugene Kirpichov <jk...@google.com>.
Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <da...@acm.org> wrote:

> I believe I've nailed down a situation that happens in practice that
> causes Beam and Rabbit to be incompatible. It seems that runners can and do
> make assumptions about the serializability (via Coder) of a CheckpointMark.
>
> To start, these are the semantics of RabbitMQ:
>
> - the client establishes a connection to the server
> - client opens a channel on the connection
> - messages are either pulled or pushed to the client from the server along
> this channel
> - when messages are done processing, they are acknowledged *client-side*
> and must be acknowledged on the *same channel* that originally received the
> message.
>
> Since a channel (or any open connection) is non-serializable, it means
> that a CheckpointMark that has been serialized cannot ever be used to
> acknowledge these messages and correctly 'finalize' the checkpoint. It
> also, as previously discussed in this thread, implies a rabbit Reader
> cannot accept an existing CheckpointMark at all; the Reader and the
> CheckpointMark must share the same connection to the rabbit server
> ("channel").
>
This is correct.


> Next, I've found how DirectRunner (and presumably others) can attempt to
> serialize a CheckpointMark that has not been finalized. In
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
> the DirectRunner applies a probability and if it hits, it sets the current
> reader to 'null' but retains the existing CheckpointMark, which it then
> attempts to pass to a new reader via a Coder.
>
Correct, this simulates a failure scenario:
- Runner was reading the source and, after finalizing a bunch of previous
CheckpointMarks, obtained a new one and serialized it so things can be
restored in case of failure
- A failure happened before the current CheckpointMark could be finalized,
which means Beam was not able to guarantee that elements after the
last-finalized mark have been durably processed, so we may need to re-read
them, so runner recreates a reader from the current mark.


> This puts the shard, the runner, and the reader with differing views of
> the world. In UnboundedReadEvaluatorFactory's processElement function, a
> call to getReader(shard) (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
> ) clones the shard's checkpoint mark and passes that to the new reader. The
> reader ignores it, creating its own, but even if it accepted it, it would
> be accepting a serialized CheckpointMark, which wouldn't work.
>
Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
affect what the reader will read: it depends only on the broker's internal
state (which in turn depends on which messages have been acked by previous
finalized CheckpointMark's).

> Later, the runner calls finishRead (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
> ). The shard's CheckpointMark (unserialized; which should still be valid)
> is finalized. The reader's CheckpointMark (which may be a different
> instance) becomes the return value, which is referred to as
> "finishedCheckpoint" in the calling code, which is misleading at best and
> problematic at worst as *this* checkpoint has not been finalized.
>
I'm not following what is the problem here. In that code, "oldMark" is the
last checkpoint mark to be finalized - calling finalizeCheckpoint on it
signals that Beam has durably processed all the messages read from the
reader until that mark. "mark" (the new one) represents the state of the
reader after the last finalized mark, so it should not be finalized.

I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
things go like this:

Create a reader
Let mark M1 = reader.getCheckpointMark()
Durably persist M1 as the "restore point" of this reader
...read messages A B C from reader and durably process them...
Finalize M1 (acks A B C)
Let mark M2 = reader.getCheckpointMark()
Durably persist M2 as the "restore point" of this reader
...read messages D E F and durably process them...
Finalize M2 (acks D E F)

Now let's imagine a failure.
Durably persist M2 as the "restore point" of this reader
...read messages D E, and then a failure happens
Recreate reader from M2 (reader ignores M2 but it doesn't matter)
Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
will redeliver them to this reader. D E will be processed twice, but only
the results of this new processing will be durable.
Finalize M2 (acks D E F)
Etc.

Basically you can think of this as a series of micro-bundles, where
micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
runner-side transaction which either commits or discards the results of
processing all messages in this micro-bundle. After a micro-bundle [M1, M2)
commits, the runner calls M1.finalizeCheckpointMark() and persists M2 as
the new restore point in case of failure.

> So, tl;dr: I cannot find any means of maintaining a persistent connection
> to the server for finalizing checkpoints that is safe across runners. If
> there's a guarantee all of the shards are on the same JVM instance, I could
> rely on global, static collections/instances as a workaround, but if other
> runners might serialize this across the wire, I'm stumped. The only
> workable situation I can think of right now is to proactively acknowledge
> messages as they are received and effectively no-op in finalizeCheckpoint.
> This is very different, semantically, and can lead to dropped messages if a
> pipeline doesn't finish processing the given message.
>
> Any help would be much appreciated.
>
If I'm misunderstanding something above, could you describe in detail a
scenario that leads to message loss, or (less severe) to more-than-once
durable processing of the same message?


> Thanks,
> -Danny
> On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
> wrote:
>
>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>> As part of this I switched to a pull-based API rather than the
>> previously-used push-based. This has caused some nebulous problems so
>> put up a correction PR that I think needs some eyes fairly quickly as
>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>> the upgrade but reverts to the same push-based implementation as in 4.x:
>> https://github.com/apache/beam/pull/9977 )
>>
>> Regardless, in trying to get the pull-based API to work, I'm finding the
>> interactions between rabbitmq and beam with CheckpointMark to be
>> fundamentally impossible to implement so I'm hoping for some input here.
>>
>> CheckointMark itself must be Serializable, presumably this means it gets
>> shuffled around between nodes. However 'Channel', the tunnel through
>> which it communicates with Rabbit to ack messages and finalize the
>> checkpoint, is non-Serializable. Like most other CheckpointMark
>> implementations, Channel is 'transient'. When a new CheckpointMark is
>> instantiated, it's given a Channel. If an existing one is supplied to
>> the Reader's constructor (part of the 'startReader()' interface), the
>> channel is overwritten.
>>
>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>> than the one that consumed them in the first place. Attempting to do so
>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>
>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>> ).
>>
>> Truthfully, I don't really understand how the current implementation is
>> working; it seems like a happy accident. But I'm curious if someone
>> could help me debug and implement how to bridge the
>> re-usable/serializable CheckpointMark requirement in Beam with this
>> limitation of Rabbit.
>>
>> Thanks,
>> -Daniel Robert
>>
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
I believe I've nailed down a situation that happens in practice that 
causes Beam and Rabbit to be incompatible. It seems that runners can and 
do make assumptions about the serializability (via Coder) of a 
CheckpointMark.

To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server 
along this channel
- when messages are done processing, they are acknowledged *client-side* 
and must be acknowledged on the *same channel* that originally received 
the message.

Since a channel (or any open connection) is non-serializable, it means 
that a CheckpointMark that has been serialized cannot ever be used to 
acknowledge these messages and correctly 'finalize' the checkpoint. It 
also, as previously discussed in this thread, implies a rabbit Reader 
cannot accept an existing CheckpointMark at all; the Reader and the 
CheckpointMark must share the same connection to the rabbit server 
("channel").

Next, I've found how DirectRunner (and presumably others) can attempt to 
serialize a CheckpointMark that has not been finalized. In 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
the DirectRunner applies a probability and if it hits, it sets the 
current reader to 'null' but retains the existing CheckpointMark, which 
it then attempts to pass to a new reader via a Coder.

This puts the shard, the runner, and the reader with differing views of 
the world. In UnboundedReadEvaluatorFactory's processElement function, a 
call to getReader(shard) ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
) clones the shard's checkpoint mark and passes that to the new reader. 
The reader ignores it, creating its own, but even if it accepted it, it 
would be accepting a serialized CheckpointMark, which wouldn't work. 
Later, the runner calls finishRead ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
). The shard's CheckpointMark (unserialized; which should still be 
valid) is finalized. The reader's CheckpointMark (which may be a 
different instance) becomes the return value, which is referred to as 
"finishedCheckpoint" in the calling code, which is misleading at best 
and problematic at worst as *this* checkpoint has not been finalized.

So, tl;dr: I cannot find any means of maintaining a persistent 
connection to the server for finalizing checkpoints that is safe across 
runners. If there's a guarantee all of the shards are on the same JVM 
instance, I could rely on global, static collections/instances as a 
workaround, but if other runners might serialize this across the wire, 
I'm stumped. The only workable situation I can think of right now is to 
proactively acknowledge messages as they are received and effectively 
no-op in finalizeCheckpoint. This is very different, semantically, and 
can lead to dropped messages if a pipeline doesn't finish processing the 
given message.

Any help would be much appreciated.

Thanks,
-Danny

On 11/7/19 10:27 PM, Eugene Kirpichov wrote:
> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is 
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has 
> been consumed, so that if something fails we can tell the underlying 
> streaming system where to start reading when we re-create the reader. 
> This is why CheckpointMark is Serializable. E.g. this makes sense for 
> Kafka.
> 2) To do acks - to let the underlying streaming system know that the 
> Beam pipeline will never need data up to this CheckpointMark. Acking 
> does not require serializability - runners call ack() on the same 
> in-memory instance of CheckpointMark that was produced by the reader. 
> E.g. this makes sense for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: 
> some streaming systems can provide a serializable CheckpointMark, some 
> can do acks, some can do neither - but very few (or none) can do both, 
> and it's debatable whether it even makes sense for a system to provide 
> both capabilities: usually acking is an implicit form of 
> streaming-system-side checkpointing, i.e. when you re-create the 
> reader you don't actually need to carry over any information from an 
> old CheckpointMark - the necessary state (which records should be 
> delivered) is maintained on the streaming system side.
>
> These two are lumped together into one API simply because that was the 
> best design option we came up with (not for lack of trying, but 
> suggestions very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but 
> it can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <daniel.robert@acm.org 
> <ma...@acm.org>> wrote:
>
>     (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
>     library.
>     As part of this I switched to a pull-based API rather than the
>     previously-used push-based. This has caused some nebulous problems so
>     put up a correction PR that I think needs some eyes fairly quickly as
>     I'd consider master to be broken for rabbitmq right now. The PR keeps
>     the upgrade but reverts to the same push-based implementation as
>     in 4.x:
>     https://github.com/apache/beam/pull/9977 )
>
>     Regardless, in trying to get the pull-based API to work, I'm
>     finding the
>     interactions between rabbitmq and beam with CheckpointMark to be
>     fundamentally impossible to implement so I'm hoping for some input
>     here.
>
>     CheckointMark itself must be Serializable, presumably this means
>     it gets
>     shuffled around between nodes. However 'Channel', the tunnel through
>     which it communicates with Rabbit to ack messages and finalize the
>     checkpoint, is non-Serializable. Like most other CheckpointMark
>     implementations, Channel is 'transient'. When a new CheckpointMark is
>     instantiated, it's given a Channel. If an existing one is supplied to
>     the Reader's constructor (part of the 'startReader()' interface), the
>     channel is overwritten.
>
>     *However*, Rabbit does not support 'ack'ing messages on a channel
>     other
>     than the one that consumed them in the first place. Attempting to
>     do so
>     results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>     https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>
>     ).
>
>     Truthfully, I don't really understand how the current
>     implementation is
>     working; it seems like a happy accident. But I'm curious if someone
>     could help me debug and implement how to bridge the
>     re-usable/serializable CheckpointMark requirement in Beam with this
>     limitation of Rabbit.
>
>     Thanks,
>     -Daniel Robert
>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Eugene Kirpichov <jk...@google.com>.
On Fri, Nov 8, 2019 at 5:57 AM Daniel Robert <da...@acm.org> wrote:

> Thanks Euguene and Reuven.
>
> In response to Eugene, I'd like to confirm I have this correct: In the
> rabbit-style use case of "stream-system-side checkpointing", it is safe
> (and arguably the correct behavior) to ignore the supplied CheckpointMark
> argument in `createReader(options, checkpointmark)` and in the constructor
> for the and instead always instantiate a new CheckpointMark during
> construction. Is that correct?
>
Yes, this is correct.


> In response to Reuven: noted, however I was mostly using serialization in
> the general sense. That is, there does not seem to be any means of
> deserializing a RabbitMqCheckpointMark such that it can continue to provide
> value to a runner. Whether it's java serialization, avro, or any other
> Coder, the 'channel' itself cannot "come along for the ride", which leaves
> the rest of the internal state mostly unusable except for perhaps some
> historical, immutable use case.
>
> -Danny
> On 11/8/19 2:01 AM, Reuven Lax wrote:
>
> Just to clarify one thing: CheckpointMark does not need to be Java
> Seralizable. All that's needed is do return a Coder for the CheckpointMark
> in getCheckpointMarkCoder.
>
> On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <jk...@google.com> wrote:
>
>> Hi Daniel,
>>
>> This is probably insufficiently well documented. The CheckpointMark is
>> used for two purposes:
>> 1) To persistently store some notion of how much of the stream has been
>> consumed, so that if something fails we can tell the underlying streaming
>> system where to start reading when we re-create the reader. This is why
>> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
>> 2) To do acks - to let the underlying streaming system know that the Beam
>> pipeline will never need data up to this CheckpointMark. Acking does not
>> require serializability - runners call ack() on the same in-memory instance
>> of CheckpointMark that was produced by the reader. E.g. this makes sense
>> for RabbitMq or Pubsub.
>>
>> In practice, these two capabilities tend to be mutually exclusive: some
>> streaming systems can provide a serializable CheckpointMark, some can do
>> acks, some can do neither - but very few (or none) can do both, and it's
>> debatable whether it even makes sense for a system to provide both
>> capabilities: usually acking is an implicit form of streaming-system-side
>> checkpointing, i.e. when you re-create the reader you don't actually need
>> to carry over any information from an old CheckpointMark - the necessary
>> state (which records should be delivered) is maintained on the streaming
>> system side.
>>
>> These two are lumped together into one API simply because that was the
>> best design option we came up with (not for lack of trying, but suggestions
>> very much welcome - AFAIK nobody is happy with it).
>>
>> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
>> can do acks. So you can simply ignore the non-serializability.
>>
>> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
>> wrote:
>>
>>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>>> As part of this I switched to a pull-based API rather than the
>>> previously-used push-based. This has caused some nebulous problems so
>>> put up a correction PR that I think needs some eyes fairly quickly as
>>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>>> the upgrade but reverts to the same push-based implementation as in 4.x:
>>> https://github.com/apache/beam/pull/9977 )
>>>
>>> Regardless, in trying to get the pull-based API to work, I'm finding the
>>> interactions between rabbitmq and beam with CheckpointMark to be
>>> fundamentally impossible to implement so I'm hoping for some input here.
>>>
>>> CheckointMark itself must be Serializable, presumably this means it gets
>>> shuffled around between nodes. However 'Channel', the tunnel through
>>> which it communicates with Rabbit to ack messages and finalize the
>>> checkpoint, is non-Serializable. Like most other CheckpointMark
>>> implementations, Channel is 'transient'. When a new CheckpointMark is
>>> instantiated, it's given a Channel. If an existing one is supplied to
>>> the Reader's constructor (part of the 'startReader()' interface), the
>>> channel is overwritten.
>>>
>>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>>> than the one that consumed them in the first place. Attempting to do so
>>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>>
>>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>>> ).
>>>
>>> Truthfully, I don't really understand how the current implementation is
>>> working; it seems like a happy accident. But I'm curious if someone
>>> could help me debug and implement how to bridge the
>>> re-usable/serializable CheckpointMark requirement in Beam with this
>>> limitation of Rabbit.
>>>
>>> Thanks,
>>> -Daniel Robert
>>>
>>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Daniel Robert <da...@acm.org>.
Thanks Euguene and Reuven.

In response to Eugene, I'd like to confirm I have this correct: In the 
rabbit-style use case of "stream-system-side checkpointing", it is safe 
(and arguably the correct behavior) to ignore the supplied 
CheckpointMark argument in `createReader(options, checkpointmark)` and 
in the constructor for the and instead always instantiate a new 
CheckpointMark during construction. Is that correct?

In response to Reuven: noted, however I was mostly using serialization 
in the general sense. That is, there does not seem to be any means of 
deserializing a RabbitMqCheckpointMark such that it can continue to 
provide value to a runner. Whether it's java serialization, avro, or any 
other Coder, the 'channel' itself cannot "come along for the ride", 
which leaves the rest of the internal state mostly unusable except for 
perhaps some historical, immutable use case.

-Danny

On 11/8/19 2:01 AM, Reuven Lax wrote:
> Just to clarify one thing: CheckpointMark does not need to be Java 
> Seralizable. All that's needed is do return a Coder for the 
> CheckpointMark in getCheckpointMarkCoder.
>
> On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <jkff@google.com 
> <ma...@google.com>> wrote:
>
>     Hi Daniel,
>
>     This is probably insufficiently well documented. The
>     CheckpointMark is used for two purposes:
>     1) To persistently store some notion of how much of the stream has
>     been consumed, so that if something fails we can tell the
>     underlying streaming system where to start reading when we
>     re-create the reader. This is why CheckpointMark is Serializable.
>     E.g. this makes sense for Kafka.
>     2) To do acks - to let the underlying streaming system know that
>     the Beam pipeline will never need data up to this CheckpointMark.
>     Acking does not require serializability - runners call ack() on
>     the same in-memory instance of CheckpointMark that was produced by
>     the reader. E.g. this makes sense for RabbitMq or Pubsub.
>
>     In practice, these two capabilities tend to be mutually exclusive:
>     some streaming systems can provide a serializable CheckpointMark,
>     some can do acks, some can do neither - but very few (or none) can
>     do both, and it's debatable whether it even makes sense for a
>     system to provide both capabilities: usually acking is an implicit
>     form of streaming-system-side checkpointing, i.e. when you
>     re-create the reader you don't actually need to carry over any
>     information from an old CheckpointMark - the necessary state
>     (which records should be delivered) is maintained on the streaming
>     system side.
>
>     These two are lumped together into one API simply because that was
>     the best design option we came up with (not for lack of trying,
>     but suggestions very much welcome - AFAIK nobody is happy with it).
>
>     RabbitMQ is under #2 - it can't do serializable checkpoint marks,
>     but it can do acks. So you can simply ignore the non-serializability.
>
>     On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
>     <daniel.robert@acm.org <ma...@acm.org>> wrote:
>
>         (Background: I recently upgraded RabbitMqIO from the 4.x to
>         5.x library.
>         As part of this I switched to a pull-based API rather than the
>         previously-used push-based. This has caused some nebulous
>         problems so
>         put up a correction PR that I think needs some eyes fairly
>         quickly as
>         I'd consider master to be broken for rabbitmq right now. The
>         PR keeps
>         the upgrade but reverts to the same push-based implementation
>         as in 4.x:
>         https://github.com/apache/beam/pull/9977 )
>
>         Regardless, in trying to get the pull-based API to work, I'm
>         finding the
>         interactions between rabbitmq and beam with CheckpointMark to be
>         fundamentally impossible to implement so I'm hoping for some
>         input here.
>
>         CheckointMark itself must be Serializable, presumably this
>         means it gets
>         shuffled around between nodes. However 'Channel', the tunnel
>         through
>         which it communicates with Rabbit to ack messages and finalize
>         the
>         checkpoint, is non-Serializable. Like most other CheckpointMark
>         implementations, Channel is 'transient'. When a new
>         CheckpointMark is
>         instantiated, it's given a Channel. If an existing one is
>         supplied to
>         the Reader's constructor (part of the 'startReader()'
>         interface), the
>         channel is overwritten.
>
>         *However*, Rabbit does not support 'ack'ing messages on a
>         channel other
>         than the one that consumed them in the first place. Attempting
>         to do so
>         results in a '406 (PRECONDITION-FAILED) - unknown delivery
>         tag'. (See
>         https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>
>         ).
>
>         Truthfully, I don't really understand how the current
>         implementation is
>         working; it seems like a happy accident. But I'm curious if
>         someone
>         could help me debug and implement how to bridge the
>         re-usable/serializable CheckpointMark requirement in Beam with
>         this
>         limitation of Rabbit.
>
>         Thanks,
>         -Daniel Robert
>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Reuven Lax <re...@google.com>.
Just to clarify one thing: CheckpointMark does not need to be Java
Seralizable. All that's needed is do return a Coder for the CheckpointMark
in getCheckpointMarkCoder.

On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <jk...@google.com> wrote:

> Hi Daniel,
>
> This is probably insufficiently well documented. The CheckpointMark is
> used for two purposes:
> 1) To persistently store some notion of how much of the stream has been
> consumed, so that if something fails we can tell the underlying streaming
> system where to start reading when we re-create the reader. This is why
> CheckpointMark is Serializable. E.g. this makes sense for Kafka.
> 2) To do acks - to let the underlying streaming system know that the Beam
> pipeline will never need data up to this CheckpointMark. Acking does not
> require serializability - runners call ack() on the same in-memory instance
> of CheckpointMark that was produced by the reader. E.g. this makes sense
> for RabbitMq or Pubsub.
>
> In practice, these two capabilities tend to be mutually exclusive: some
> streaming systems can provide a serializable CheckpointMark, some can do
> acks, some can do neither - but very few (or none) can do both, and it's
> debatable whether it even makes sense for a system to provide both
> capabilities: usually acking is an implicit form of streaming-system-side
> checkpointing, i.e. when you re-create the reader you don't actually need
> to carry over any information from an old CheckpointMark - the necessary
> state (which records should be delivered) is maintained on the streaming
> system side.
>
> These two are lumped together into one API simply because that was the
> best design option we came up with (not for lack of trying, but suggestions
> very much welcome - AFAIK nobody is happy with it).
>
> RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
> can do acks. So you can simply ignore the non-serializability.
>
> On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org>
> wrote:
>
>> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
>> As part of this I switched to a pull-based API rather than the
>> previously-used push-based. This has caused some nebulous problems so
>> put up a correction PR that I think needs some eyes fairly quickly as
>> I'd consider master to be broken for rabbitmq right now. The PR keeps
>> the upgrade but reverts to the same push-based implementation as in 4.x:
>> https://github.com/apache/beam/pull/9977 )
>>
>> Regardless, in trying to get the pull-based API to work, I'm finding the
>> interactions between rabbitmq and beam with CheckpointMark to be
>> fundamentally impossible to implement so I'm hoping for some input here.
>>
>> CheckointMark itself must be Serializable, presumably this means it gets
>> shuffled around between nodes. However 'Channel', the tunnel through
>> which it communicates with Rabbit to ack messages and finalize the
>> checkpoint, is non-Serializable. Like most other CheckpointMark
>> implementations, Channel is 'transient'. When a new CheckpointMark is
>> instantiated, it's given a Channel. If an existing one is supplied to
>> the Reader's constructor (part of the 'startReader()' interface), the
>> channel is overwritten.
>>
>> *However*, Rabbit does not support 'ack'ing messages on a channel other
>> than the one that consumed them in the first place. Attempting to do so
>> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>>
>> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
>> ).
>>
>> Truthfully, I don't really understand how the current implementation is
>> working; it seems like a happy accident. But I'm curious if someone
>> could help me debug and implement how to bridge the
>> re-usable/serializable CheckpointMark requirement in Beam with this
>> limitation of Rabbit.
>>
>> Thanks,
>> -Daniel Robert
>>
>>

Re: RabbitMQ and CheckpointMark feasibility

Posted by Eugene Kirpichov <jk...@google.com>.
Hi Daniel,

This is probably insufficiently well documented. The CheckpointMark is used
for two purposes:
1) To persistently store some notion of how much of the stream has been
consumed, so that if something fails we can tell the underlying streaming
system where to start reading when we re-create the reader. This is why
CheckpointMark is Serializable. E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that the Beam
pipeline will never need data up to this CheckpointMark. Acking does not
require serializability - runners call ack() on the same in-memory instance
of CheckpointMark that was produced by the reader. E.g. this makes sense
for RabbitMq or Pubsub.

In practice, these two capabilities tend to be mutually exclusive: some
streaming systems can provide a serializable CheckpointMark, some can do
acks, some can do neither - but very few (or none) can do both, and it's
debatable whether it even makes sense for a system to provide both
capabilities: usually acking is an implicit form of streaming-system-side
checkpointing, i.e. when you re-create the reader you don't actually need
to carry over any information from an old CheckpointMark - the necessary
state (which records should be delivered) is maintained on the streaming
system side.

These two are lumped together into one API simply because that was the best
design option we came up with (not for lack of trying, but suggestions very
much welcome - AFAIK nobody is happy with it).

RabbitMQ is under #2 - it can't do serializable checkpoint marks, but it
can do acks. So you can simply ignore the non-serializability.

On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <da...@acm.org> wrote:

> (Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library.
> As part of this I switched to a pull-based API rather than the
> previously-used push-based. This has caused some nebulous problems so
> put up a correction PR that I think needs some eyes fairly quickly as
> I'd consider master to be broken for rabbitmq right now. The PR keeps
> the upgrade but reverts to the same push-based implementation as in 4.x:
> https://github.com/apache/beam/pull/9977 )
>
> Regardless, in trying to get the pull-based API to work, I'm finding the
> interactions between rabbitmq and beam with CheckpointMark to be
> fundamentally impossible to implement so I'm hoping for some input here.
>
> CheckointMark itself must be Serializable, presumably this means it gets
> shuffled around between nodes. However 'Channel', the tunnel through
> which it communicates with Rabbit to ack messages and finalize the
> checkpoint, is non-Serializable. Like most other CheckpointMark
> implementations, Channel is 'transient'. When a new CheckpointMark is
> instantiated, it's given a Channel. If an existing one is supplied to
> the Reader's constructor (part of the 'startReader()' interface), the
> channel is overwritten.
>
> *However*, Rabbit does not support 'ack'ing messages on a channel other
> than the one that consumed them in the first place. Attempting to do so
> results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
>
> https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed
> ).
>
> Truthfully, I don't really understand how the current implementation is
> working; it seems like a happy accident. But I'm curious if someone
> could help me debug and implement how to bridge the
> re-usable/serializable CheckpointMark requirement in Beam with this
> limitation of Rabbit.
>
> Thanks,
> -Daniel Robert
>
>