You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ramkumar (JIRA)" <ji...@apache.org> on 2017/11/29 20:27:00 UTC

[jira] [Commented] (KAFKA-6227) Kafka 0.11.01 New consumer - multiple consumers under same group not working as expected

    [ https://issues.apache.org/jira/browse/KAFKA-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271488#comment-16271488 ] 

Ramkumar commented on KAFKA-6227:
---------------------------------



I think I found the kafka behavior that may cause the issue. Please let me know if this expected behaviour

 

1.If Consumer1 is continuously polling (consumer.poll in while
loop)  on its active kafka consumer connection, kafka let consumer2 to
make a new connection under same group and  allows to poll the topic
(rebalancing works fine). That is as long as the consumer connection are
actively polling, kafka allows to add new consumer under the same group.

 

2.If consumer 1 is idle (not polling , and its kafka connection
not closed), Now if the consumer 2 attempts to connect to the same group, kafka
doesn’t let consumer 2 to poll on its new connection . Below are the
logs from kafka for this scenario.

 

Consumer 1 made a
connection , but not closed and consumer.poll has stopped

[2017-11-29 14:17:07,988] INFO [GroupCoordinator 0]: Preparing
to rebalance grou

p T with old generation 694 (__consumer_offsets-34)
(kafka.coordinator.group.Gro

upCoordinator)

[2017-11-29 14:17:08,188] INFO [GroupCoordinator 0]: Stabilized
group T generati

on 695 (__consumer_offsets-34)
(kafka.coordinator.group.GroupCoordinator)

[2017-11-29 14:17:08,200] INFO [GroupCoordinator 0]: Assignment
received from le

ader for group T for generation 695
(kafka.coordinator.group.GroupCoordinator)

Consumer 2 now attempts a
connection with consumer 1 connection is open but not polling

[2017-11-29 14:17:12,535] INFO [GroupCoordinator 0]: Preparing
to rebalance grou

p T with old generation 695 (__consumer_offsets-34)
(kafka.coordinator.group.Gro

upCoordinator)

[2017-11-29 14:20:11,195] INFO [Group Metadata Manager on Broker
0]: Removed 0 e

xpired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)

Consumer 2 has waited for
5 minutes (max.poll.interval.ms default value)  and return the coordination failed

[2017-11-29 14:22:12,515] INFO [GroupCoordinator 0]: Stabilized
group T generati

on 696 (__consumer_offsets-34)
(kafka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:18,799] INFO [GroupCoordinator 0]: Member
consumer-1-57238694-

c83b-41a9-a87c-bccc2a0b9069 in group T has failed, removing
it from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:18,801] INFO [GroupCoordinator 0]: Preparing
to rebalance grou

p T with old generation 696 (__consumer_offsets-34) (kafka.coordinator.group.Gro

upCoordinator)

[2017-11-29 14:22:22,517] INFO [GroupCoordinator 0]: Member
consumer-2-424381db-

6027-4adb-8f9a-16c5b491449f in group T has failed, removing it from
the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,518] INFO [GroupCoordinator 0]: Member
consumer-2-e37fc919-

80c3-435f-956b-73dc4eebb1fa in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,518] INFO [GroupCoordinator 0]: Member consumer-2-d655c46a-

ce8c-45f6-b576-98d034dab137 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,519] INFO [GroupCoordinator 0]: Member
consumer-2-291c0eb4-

f1e5-423f-9925-ba72ee5124ab in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,520] INFO [GroupCoordinator 0]: Member
consumer-2-05616962-

fa32-4259-a5e1-96f8701c4694 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,520] INFO [GroupCoordinator 0]: Member
consumer-2-d668f234-

ca8e-4448-9851-d4b46728698c in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,521] INFO [GroupCoordinator 0]: Member
consumer-2-25a337a7-

d445-4675-b822-e68cde2475ad in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,522] INFO [GroupCoordinator 0]: Member
consumer-2-cea6f287-

7a24-468f-a835-4689c7531c51 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,522] INFO [GroupCoordinator 0]: Member
consumer-2-6cd98d97-

6b92-4a32-99d7-13abd78abf7b in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,523] INFO [GroupCoordinator 0]: Member
consumer-2-9b48850d-

1504-48d8-9e2b-397981421150 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,523] INFO [GroupCoordinator 0]: Member
consumer-2-59d03235-

c2dc-44f7-9664-e4c3aea3f170 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,524] INFO [GroupCoordinator 0]: Member
consumer-2-a421db03-

7efa-4bdb-a133-2360319a7951 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,524] INFO [GroupCoordinator 0]: Member
consumer-2-48054b6b-

ea44-44c9-a73e-b4865d24919e in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,525] INFO [GroupCoordinator 0]: Member consumer-2-756a9ff4-

2c6d-4ac7-8582-ae01469ad4ff in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,526] INFO [GroupCoordinator 0]: Stabilized
group T generati

on 697 (__consumer_offsets-34) (kafka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:22,529] INFO [GroupCoordinator 0]: Assignment
received from le

ader for group T for generation 697
(kafka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:32,532] INFO [GroupCoordinator 0]: Member
consumer-2-4670c7c1-

5d05-4e3f-8b0d-f8c4e5f089c1 in group T has failed, removing it
from the group (k

afka.coordinator.group.GroupCoordinator)

[2017-11-29 14:22:32,533] INFO [GroupCoordinator 0]: Preparing
to rebalance grou

p T with old generation 697 (__consumer_offsets-34)
(kafka.coordinator.group.Gro

upCoordinator)

[2017-11-29 14:22:32,534] INFO [GroupCoordinator 0]: Group T
with generation 698

is now empty (__consumer_offsets-34)
(kafka.coordinator.group.GroupCoordinator)

[2017-11-29 14:30:11,253] INFO [Group Metadata Manager on Broker
0]: Removed 0 e

xpired offsets in 0 milliseconds.
(kafka.coordinator.group.GroupMetadataManager)

 

 



> Kafka 0.11.01 New consumer - multiple consumers under same group not working as expected
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6227
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6227
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.11.0.1
>            Reporter: Ramkumar
>
> In Kafka 0.8 High level consumers, the consumer.id under group.id differentiates the consumers connection and manage the rebalancing the partitions in zookeeper.  Our Service uses this logic and keeps the Kafka stream connection in a cache (Concurrent Hashmap). so that consecutive http client connection doesn’t have to make a stream connection, but takes from cache and read off the messages. This also helps multiple consumers under same group.id can simulatenously make connection to kafka and read off the message (load balancing).
> In Kafka 0.11.0.1, the New consumer API the design has changed.  The consumer.id properties are no more supported and the connections with zookeeper are managed by Kafka itself. When 2 consumers instances under the same group attempts to make a connection simulatenously , one connection waits  on consumer.poll() method until the other one (which is already active) drops the connection. http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html That is at any point of only one active consumer instance is able to poll the messages from the topic. This slightly would change the behavior of our service that we have to restrict only one consumer connection for a group for a topic. That is we couldn’t hold the connection in cache if multiple consumer under same group needs to use the Kafka.
> I couldn’t find any properties that aids to make multiple consumer connections on the same group
> The manual partition assignment may be a work around but this is way complex to handle that in a service. This is complex because the service needs to track the consumer connections and assign the partitions of the topic and do the rebalancing (what Kafka 0.8 high level consumer does originally).   Unfortunately we cannot use legacy api's n Kafka 0.11 since documentation refers about performance degradation in using old apis in new versions.
> Was there a solution devised how the highlevel consumer of kafka 0.8 can be migrated with out any change to the behavior from the users perspective



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)