You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "G G (Jira)" <ji...@apache.org> on 2019/12/04 14:23:00 UTC

[jira] [Created] (KAFKA-9266) KafkaConsumer manual assignment does not reset group assignment

G G created KAFKA-9266:
--------------------------

             Summary: KafkaConsumer manual assignment does not reset group assignment
                 Key: KAFKA-9266
                 URL: https://issues.apache.org/jira/browse/KAFKA-9266
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.3.0
            Reporter: G G


When using the manual assignment API, SubscriptionState still remembers group subscriptions in its groupSubscription member of topics to which it is no longer subscribed.

See the following code which shows the unexpected behavior:
{code:java}
    TopicPartition tp1 = new TopicPartition("a", 0);
    TopicPartition tp2 = new TopicPartition("b", 0);
    LogContext logContext = new LogContext();
    SubscriptionState state = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.unsubscribe();
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
    
    state.assignFromUser(ImmutableSet.of(tp1, tp2));
    state.assignFromUser(ImmutableSet.of(tp1));
    assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: Expected [a] but was [a, b]
{code}

The problem seems to be that within SubscriptionState.changeSubscription() the groupSubscription only grows and is never trimmed if the assignment is manual:
{code}
    private boolean changeSubscription(Set<String> topicsToSubscribe) {
        ...
        groupSubscription = new HashSet<>(groupSubscription);
        groupSubscription.addAll(topicsToSubscribe);
        ....
    }
{code}

This behavior in turn leads to METADATA requests by the client with partitions which are actually no longer assigned:
{code}
KafkaConsumer consumer;
consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
consumer.poll(); // This will cause a MetadataRequest to be sent to the broker with topic1 and topic2
consumer.assign(ImmutableList.of(topicPartition1));
consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and topic2 instead of only topic1
{code}
And this in turn causes the deletion of the topicPartion2 to fail. The workaround is to do a consumer.unassign(); before the second consumer.assign();




--
This message was sent by Atlassian Jira
(v8.3.4#803005)