You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Joost van de Wijgerd (Jira)" <ji...@apache.org> on 2020/05/12 16:38:00 UTC

[jira] [Comment Edited] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

    [ https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105573#comment-17105573 ] 

Joost van de Wijgerd edited comment on KAFKA-9953 at 5/12/20, 4:37 PM:
-----------------------------------------------------------------------

[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using transactions, so we have a couple of ConsumerRecord types we send around the system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to create different components to handle them. Every component (CommandServer, CommandClient, DomainEventListener) have their own consumer(group)s. We then have one thread (and one KafkaConsumer linked to it) that polls all these consumers in sequence, starts a transaction if there is data and then commits it.

We partition our data on userId and have a significant number of partitions per topic to ensure we have sufficient throughput / scalabiltity

In our model we publish multiple DomainEvent types to the same topic (we have roughly 220 different DomainEvent types so creating a partitioned topic for each didn't seem the right solution) and we sometimes have the need to read a different event type in our service. In that case it makes sense to create a new consumer(group) on the same topic that filters out that particular event type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an organizational perspective, of course we didn't realize back then that the design of the kafka client library was one-to-one or one-to-many in terms of consumers vs producers. However there is nothing stopping anybody from taking this approach and also I think it is straightforward for the TransactionManager to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.


was (Author: jwijgerd):
[~guozhang] our scenario is the following: 

we have implemented a CQRS/EventSourcing framework in top of kafka using transactions, so we have a couple of ConsumerRecord types we send around the system: 

1) Commands

2) CommandResponses

3) DomainEvents

As all these types have different deserializers it seemed logical to us to create different components to handle them. Every component (CommandServer, CommandClient, DomainEventListener) have their own consumer(group)s. We then have one thread (and one KafkaConsumer linked to it) that polls all these consumers in sequence, starts a transaction if there is data and then commits it.

In our model we publish multiple DomainEvent types to the same topic (we have roughly 220 different DomainEvent types so creating a partitioned topic for each didn't seem the right solution) and we sometimes have the need to read a different event type in our service. In that case it makes sense to create a new consumer(group) on the same topic that filters out that particular event type. If we would have had one consumer(group) this wouldn't have been possible.

 At the moment we designed this it seemed to make the most sense from an organizational perspective, of course we didn't realize back then that the design of the kafka client library was one-to-one or one-to-many in terms of consumers vs producers. However there is nothing stopping anybody from taking this approach and also I think it is straightforward for the TransactionManager to support this (as I showed in my PR) so why not support it?

Anyway, I hope this makes sense and you will consider the PR.

> support multiple consumerGroupCoordinators in TransactionManager
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9953
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 2.5.0
>            Reporter: Joost van de Wijgerd
>            Priority: Major
>         Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same thread and 1 transactional kafka producer. When we add the offsets to the transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it can be that some consumerGroupCoordinators are on another node, now we constantly see the TransactionManager switching between nodes, this has overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause of 100ms for every other transaction (depending on what KafkaConsumer triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)