You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gabriel Candal <ga...@gmail.com> on 2019/03/05 14:07:06 UTC

RMQSource synchronous message ack

Hi,

Recently I've opened a Stack Overflow question
<https://stackoverflow.com/questions/54909315/why-does-checkpointing-impact-latency-so-much>
about
latency spikes (~500ms) after a checkpoint operation, even though the
operation itself was relatively fast (~50ms).

I've come to realize that the cause for the latency was that the job was
waiting for the RMQSource to acknowledgeSessionIDs during
notifyCheckpointComplete.

I've noticed that the Kafka connectors do the equivalent operation
(committing offsets) asynchronously, at least from 09 onwards. My question
to you is: can you see any reason why does this acknowledgement have to
synchronous on RabbitMQ?

I believe it should be ok, given that those messages are already reflected
in the checkpointed state, but I'm not sure if there are any negatives
consequences correctness-wise.

Thanks,

Re: RMQSource synchronous message ack

Posted by gcandal <ga...@gmail.com>.
First of all, thanks for your time and quick response.

I'm not completely sure I understood your example, but is this what you
mean:

- Sink processes A, B, C
- Checkpoint persisted with A, B, C
- Notify checkpoint starts
- Notify checkpoints ACKs A
- Notify checkpoints ACKs B
- Job crashes
- Job resumes from checkpoint with A, B, C
- Job will re-process C because it was not ACK'ed

Was this it?

On a non-failure scenario (e.g.: asking the job to stop) the job wouldn't
finish before the notify is complete, even though it's asynchronous:

- Sink processes A, B, C
- Checkpoint persisted with A, B, C
- Notify checkpoint starts
- Notify checkpoints ACKs A
- Notify checkpoints ACKs B
- Job asked to stop
- Job waiting for notify to end
- Notify checkpoints ACKs C
- Job stops

This seems to be the behaviour of Kafka09Fetcher + KafkaConsumerThread, or
is there anything I'm overlooking?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: RMQSource synchronous message ack

Posted by Chesnay Schepler <ch...@apache.org>.
The acknowledgement has to be synchronous since Flink assume that after 
notifyCheckpointComplete() all data has been persisted to external 
systems. For example, if record 1 to 100 were passed to the sink and a 
checkpoint occurs and completed, on restart Flink would continue with 
record 101. But if the sink does not synchronously waits for all updates 
to be persisted the checkpoint may finish, and if then send asynchronous 
update (say for record 99) then Flink will _still_ resume from record 101.

On 05.03.2019 15:07, Gabriel Candal wrote:
> Hi,
>
> Recently I've opened a Stack Overflow question 
> <https://stackoverflow.com/questions/54909315/why-does-checkpointing-impact-latency-so-much> about 
> latency spikes (~500ms) after a checkpoint operation, even though the 
> operation itself was relatively fast (~50ms).
>
> I've come to realize that the cause for the latency was that the job 
> was waiting for the RMQSource to acknowledgeSessionIDs during 
> notifyCheckpointComplete.
>
> I've noticed that the Kafka connectors do the equivalent operation 
> (committing offsets) asynchronously, at least from 09 onwards. My 
> question to you is: can you see any reason why does this 
> acknowledgement have to synchronous on RabbitMQ?
>
> I believe it should be ok, given that those messages are already 
> reflected in the checkpointed state, but I'm not sure if there are any 
> negatives consequences correctness-wise.
>
> Thanks,
>