You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Man Hin (Jira)" <ji...@apache.org> on 2020/05/07 17:30:00 UTC

[jira] [Updated] (KAFKA-9968) Newly subscribed topic not present in metadata request

     [ https://issues.apache.org/jira/browse/KAFKA-9968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Man Hin updated KAFKA-9968:
---------------------------
    Attachment: KafkaClientVersionTest.java

> Newly subscribed topic not present in metadata request
> ------------------------------------------------------
>
>                 Key: KAFKA-9968
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9968
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.5.0, 2.4.1
>            Reporter: Man Hin
>            Priority: Major
>         Attachments: KafkaClientVersionTest.java
>
>
> Our application subscribes to multiple topics one by one. It uses to work fine. But after we have upgraded our Kafka client version from 2.4.0 and 2.4.1, our application failed to receive messages for the last topic any more.
> I spotted a warning log from Kafka client.
> {code:java}
> 2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] The following subscribed topics are not assigned to any members: [TopicX]  {code}
> I'm able to reproduce it with a test case running against a live Kafka broker (we are using v2.4.1 broker).
> {code:java}
>     @Test
>     public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws InterruptedException, ExecutionException, TimeoutException {
>         // WHEN
>         List<String> topics = new ArrayList<>();        
>         topics.add(TOPIC_C);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         topics.add(TOPIC_B);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         topics.add(TOPIC_A);
>         consumer.subscribe(topics);
>         consumer.poll(0);        
>         // THEN
>         Set<TopicPartition> assignments = consumer.assignment();
>         Set<String> topicSet = assignments.stream().map(p -> p.topic()).distinct().collect(Collectors.toSet());
>         logger.info("Topic: {}", topicSet);
>         assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));    
>     }    {code}
> We turned on trace log and found that the metadata requests always missed the last topic we subscribed.
> {code:java}
> 2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC
> 2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC, TopicB
> 2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
> 2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC, TopicB, TopicA
> 2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), MetadataRequestTopic(name='TopicC')], allowAu
> 2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
> {code}
> I suspect this is because SubscriptionState.groupSubscription contains only topics as returned by joinGroup response since 2.4.1. As such the newly subscribed topic is missed out from the metadata request.
> The behaviour before 2.4.0 was to add topics in joinGroup response to groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. See SubscriptionState in [https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c]
> I tried client v2.5.0 and got the same result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)