You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Man Hin (Jira)" <ji...@apache.org> on 2020/05/07 17:27:00 UTC

[jira] [Created] (KAFKA-9968) Newly subscribed topic not present in metadata request

Man Hin created KAFKA-9968:
------------------------------

             Summary: Newly subscribed topic not present in metadata request
                 Key: KAFKA-9968
                 URL: https://issues.apache.org/jira/browse/KAFKA-9968
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.4.1, 2.5.0
            Reporter: Man Hin


Our application subscribes to multiple topics one by one. It uses to work fine. But after we have upgraded our Kafka client version from 2.4.0 and 2.4.1, our application failed to receive messages for the last topic any more.

I spotted a warning log from Kafka client.
{code:java}
2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] The following subscribed topics are not assigned to any members: [TopicX]  {code}
I'm able to reproduce it with a test case running against a live Kafka broker (we are using v2.4.1 broker).
{code:java}
    @Test
    public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws InterruptedException, ExecutionException, TimeoutException {
        // WHEN
        List<String> topics = new ArrayList<>();        

        topics.add(TOPIC_C);
        consumer.subscribe(topics);
        consumer.poll(0);        

        topics.add(TOPIC_B);
        consumer.subscribe(topics);
        consumer.poll(0);        

        topics.add(TOPIC_A);
        consumer.subscribe(topics);
        consumer.poll(0);        

        // THEN
        Set<TopicPartition> assignments = consumer.assignment();
        Set<String> topicSet = assignments.stream().map(p -> p.topic()).distinct().collect(Collectors.toSet());
        logger.info("Topic: {}", topicSet);
        assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));    
    }    {code}
We turned on trace log and found that the metadata requests always missed the last topic we subscribed.
{code:java}
2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC
2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC, TopicB
2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC, TopicB, TopicA
2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer clientId=sample_consumer, groupId=sample_client] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), MetadataRequestTopic(name='TopicC')], allowAu
2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, rebalanceTimeoutMs=300000, memberId=
{code}
I suspect this is because SubscriptionState.groupSubscription contains only topics as returned by joinGroup response since 2.4.1. As such the newly subscribed topic is missed out from the metadata request.

The behaviour before 2.4.0 was to add topics in joinGroup response to groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. See SubscriptionState in [https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c]

I tried client v2.5.0 and got the same result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)