You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "G G (Jira)" <ji...@apache.org> on 2019/12/04 14:23:00 UTC
[jira] [Created] (KAFKA-9266) KafkaConsumer manual assignment does
not reset group assignment
G G created KAFKA-9266:
--------------------------
Summary: KafkaConsumer manual assignment does not reset group assignment
Key: KAFKA-9266
URL: https://issues.apache.org/jira/browse/KAFKA-9266
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.3.0
Reporter: G G
When using the manual assignment API, SubscriptionState still remembers group subscriptions in its groupSubscription member of topics to which it is no longer subscribed.
See the following code which shows the unexpected behavior:
{code:java}
TopicPartition tp1 = new TopicPartition("a", 0);
TopicPartition tp2 = new TopicPartition("b", 0);
LogContext logContext = new LogContext();
SubscriptionState state = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
state.assignFromUser(ImmutableSet.of(tp1, tp2));
state.unsubscribe();
state.assignFromUser(ImmutableSet.of(tp1));
assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
state.assignFromUser(ImmutableSet.of(tp1, tp2));
state.assignFromUser(ImmutableSet.of(tp1));
assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: Expected [a] but was [a, b]
{code}
The problem seems to be that within SubscriptionState.changeSubscription() the groupSubscription only grows and is never trimmed if the assignment is manual:
{code}
private boolean changeSubscription(Set<String> topicsToSubscribe) {
...
groupSubscription = new HashSet<>(groupSubscription);
groupSubscription.addAll(topicsToSubscribe);
....
}
{code}
This behavior in turn leads to METADATA requests by the client with partitions which are actually no longer assigned:
{code}
KafkaConsumer consumer;
consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
consumer.poll(); // This will cause a MetadataRequest to be sent to the broker with topic1 and topic2
consumer.assign(ImmutableList.of(topicPartition1));
consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and topic2 instead of only topic1
{code}
And this in turn causes the deletion of the topicPartion2 to fail. The workaround is to do a consumer.unassign(); before the second consumer.assign();
--
This message was sent by Atlassian Jira
(v8.3.4#803005)