You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthew de Detrich (Jira)" <ji...@apache.org> on 2022/06/07 13:21:00 UTC
[jira] [Comment Edited] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign
[ https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551082#comment-17551082 ]
Matthew de Detrich edited comment on KAFKA-8420 at 6/7/22 1:20 PM:
-------------------------------------------------------------------
So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this
{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
ConsumerRecords<String, String> initialConsumerRecords = consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());
consumer.unsubscribe();
consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse).
After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e.
{quote}In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up the wrong tree? Do you have any insights into this [~guozhang]
was (Author: mdedetrich-aiven):
So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this
{code:java}
@Test
public void gracefulHandlingSwitchSubscribeToManualAssign() {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
ConsumerRecords<String, String> initialConsumerRecords = consumer.poll(Duration.ofMillis(0));
assertTrue(initialConsumerRecords.isEmpty());
consumer.unsubscribe();
consumer.assign(singleton(tp0));
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
consumer.poll(Duration.ofSeconds(1));
} {code}
The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse).
After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e.
{quote}
In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
{quote}
but it looks like that I have either found something else or I am barking up the wrong tree?
> Graceful handling when consumer switches from subscribe to manual assign
> ------------------------------------------------------------------------
>
> Key: KAFKA-8420
> URL: https://issues.apache.org/jira/browse/KAFKA-8420
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: Guozhang Wang
> Assignee: Matthew de Detrich
> Priority: Major
>
> Today if a consumer switches between subscribe (and hence relies on group rebalance to get assignment) and manual assign, it may cause unnecessary rebalances. For example:
> 1. consumer.subscribe();
> 2. consumer.poll(); // join-group request sent, returns empty because poll timeout
> 3. consumer.unsubscribe();
> 4. consumer.assign(..);
> 5. consumer.poll(); // sync-group request received, and the assigned partitions does not match the current subscription-state. In this case it will tries to re-join which is not necessary.
> In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins.
> Although it is not a very common usage scenario, it still worth being better handled than the status-quo.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)