You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (Jira)" <ji...@apache.org> on 2020/12/17 08:28:00 UTC

[jira] [Commented] (FLINK-20628) Port RabbitMQ Sources to FLIP-27 API

    [ https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17250896#comment-17250896 ] 

Aljoscha Krettek commented on FLINK-20628:
------------------------------------------

So in this case splits would not contain a "real" split because all the readers would essentially subscribe to the same logical thing?

> Port RabbitMQ Sources to FLIP-27 API
> ------------------------------------
>
>                 Key: FLINK-20628
>                 URL: https://issues.apache.org/jira/browse/FLINK-20628
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors/ RabbitMQ
>            Reporter: Jan Westphal
>            Priority: Major
>             Fix For: 1.12.0
>
>
> *Structure*
> The new RabbitMQ Source will have three components:
>  * RabbitMQ enumerator that receives one RabbitMQ Channel Config.
>  * RabbitMQ splits contain the RabbitMQ Channel Config
>  * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive the messages (automatically load balanced by RabbitMQ).
> *Checkpointing Enumerators*
> The enumerator only needs to checkpoint the RabbitMQ channel config since the continuous discovery of new unread/unhandled messages is taken care of by the subscribed RabbitMQ readers and RabbitMQ itself.
> *Checkpointing Readers*
> The new RabbitMQ Source needs to ensure that every reader can be checkpointed.
> Since RabbitMQ is non-persistent and cannot be read by offset, a combined usage of checkpoints and message acknowledgments is necessary. Until a received message is checkpointed by a reader, it will stay in an un-acknowledge state. As soon as the checkpoint is created, the messages from the last checkpoint can be acknowledged as handled against RabbitMQ and thus will be deleted only then. Messages need to be acknowledged one by one as messages are handled by each SourceReader individually.
> When deserializing the messages we will make use of the implementation in the existing RabbitMQ Source.
> *Message Delivery Guarantees* 
> Unacknowledged messages of a reader will be redelivered by RabbitMQ automatically to other consumers of the same channel if the reader goes down.
>  
> This Source is going to only support at-least-once as this is the default RabbitMQ behavior and thus everything else would require changes to RabbitMQ itself or would impair the idea of parallelizing SourceReaders.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)