You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthew de Detrich (Jira)" <ji...@apache.org> on 2022/06/07 13:21:00 UTC

[jira] [Comment Edited] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign

    [ https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551082#comment-17551082 ] 

Matthew de Detrich edited comment on KAFKA-8420 at 6/7/22 1:20 PM:
-------------------------------------------------------------------

So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this
{code:java}
    @Test
    public void gracefulHandlingSwitchSubscribeToManualAssign() {
        ConsumerMetadata metadata = createMetadata(subscription);
        MockClient client = new MockClient(time, metadata);

        initMetadata(client, Collections.singletonMap(topic, 1));
        Node node = metadata.fetch().nodes().get(0);

        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);

        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
        prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);


        ConsumerRecords<String, String> initialConsumerRecords = consumer.poll(Duration.ofMillis(0));
        assertTrue(initialConsumerRecords.isEmpty());

        consumer.unsubscribe();

        consumer.assign(singleton(tp0));
        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
        consumer.poll(Duration.ofSeconds(1));
    } {code}
The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e. 
{quote}In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up the wrong tree? Do you have any insights into this [~guozhang] 


was (Author: mdedetrich-aiven):
So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this


{code:java}
    @Test
    public void gracefulHandlingSwitchSubscribeToManualAssign() {
        ConsumerMetadata metadata = createMetadata(subscription);
        MockClient client = new MockClient(time, metadata);

        initMetadata(client, Collections.singletonMap(topic, 1));
        Node node = metadata.fetch().nodes().get(0);

        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);

        consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
        prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);


        ConsumerRecords<String, String> initialConsumerRecords = consumer.poll(Duration.ofMillis(0));
        assertTrue(initialConsumerRecords.isEmpty());

        consumer.unsubscribe();

        consumer.assign(singleton(tp0));
        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
        consumer.poll(Duration.ofSeconds(1));
    } {code}
The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse).

After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e. 
{quote}
In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up the wrong tree?

> Graceful handling when consumer switches from subscribe to manual assign
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-8420
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8420
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Guozhang Wang
>            Assignee: Matthew de Detrich
>            Priority: Major
>
> Today if a consumer switches between subscribe (and hence relies on group rebalance to get assignment) and manual assign, it may cause unnecessary rebalances. For example:
> 1. consumer.subscribe();
> 2. consumer.poll();     // join-group request sent, returns empty because poll timeout
> 3. consumer.unsubscribe();
> 4. consumer.assign(..);
> 5. consumer.poll();     // sync-group request received, and the assigned partitions does not match the current subscription-state. In this case it will tries to re-join which is not necessary.
> In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
> Although it is not a very common usage scenario, it still worth being better handled than the status-quo.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)