You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/29 00:43:20 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8739: KAFKA-10056; Ensure consumer metadata contains new topics on subscription change

hachikuji commented on a change in pull request #8739:
URL: https://github.com/apache/kafka/pull/8739#discussion_r432197918



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -345,7 +345,17 @@ synchronized boolean matchesSubscribedPattern(String topic) {
      *   of the current generation; otherwise it returns the same set as {@link #subscription()}
      */
     synchronized Set<String> metadataTopics() {
-        return groupSubscription.isEmpty() ? subscription : groupSubscription;
+        if (groupSubscription.isEmpty())
+            return subscription;
+        else if (groupSubscription.containsAll(subscription))
+            return groupSubscription;
+        else {
+            // When subscription changes `groupSubscription` may be outdated, ensure that
+            // new subscription topics are returned.
+            Set<String> topics = new HashSet<>(groupSubscription);
+            topics.addAll(subscription);

Review comment:
       I agree this change seems to make sense. I'm trying to understand the edge case a little bit better. It seems the basic scenario is the following:
   
   1. user calls subscribe. subscription is updated to (A), while group subscription might be (B)
   2. we call `requestUpdateForNewTopics` which bumps the request version
   3. metadata update gets triggered and requests (B) with the bumped request version
   
   At this point, no further metadata update will be sent, but the consumer should rebalance. The part that confuses me a little bit is that we don't request a metadata update following the rebalance. 
   
   I guess it is due to `SubscriptionState.groupSubscribe`? Assuming that we remain the leader, if (A) is the only topic subscribed, then we will first change `groupSubscription` to (A). Then we will not request a new metadata update because `groupSubscription` matches `subscription`.
   
   Alternatively, if we are not the leader, we will call `resetGroupSubscription`, which will set `groupSubscription` to (), but will not request an update.
   
   Do I have that right? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org