You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/16 09:16:22 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

showuon commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r689370424



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -597,28 +603,32 @@ private void updateGroupSubscription(Set<String> topics) {
         // own metadata with the newly added topics so that it will not trigger a subsequent rebalance
         // when these topics gets updated from metadata refresh.
         //
+        // We skip the check for in-product assignors since this will not happen in in-product assignors.
+        //
         // TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
         //       we may need to modify the ConsumerPartitionAssignor API to better support this case.
-        Set<String> assignedTopics = new HashSet<>();
-        for (Assignment assigned : assignments.values()) {
-            for (TopicPartition tp : assigned.partitions())
-                assignedTopics.add(tp.topic());
-        }
+        if (!isInProductAssignor(assignor.name())) {
+            Set<String> assignedTopics = new HashSet<>();
+            for (Assignment assigned : assignments.values()) {
+                for (TopicPartition tp : assigned.partitions())
+                    assignedTopics.add(tp.topic());
+            }
 
-        if (!assignedTopics.containsAll(allSubscribedTopics)) {
-            Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
-            notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
-        }
+            if (!assignedTopics.containsAll(allSubscribedTopics)) {
+                Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
+                notAssignedTopics.removeAll(assignedTopics);
+                log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
+            }
 
-        if (!allSubscribedTopics.containsAll(assignedTopics)) {
-            Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
-            newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned, and their metadata will be " +
+            if (!allSubscribedTopics.containsAll(assignedTopics)) {
+                Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
+                newlyAddedTopics.removeAll(allSubscribedTopics);
+                log.info("The following not-subscribed topics are assigned, and their metadata will be " +
                     "fetched from the brokers: {}", newlyAddedTopics);
 
-            allSubscribedTopics.addAll(assignedTopics);
-            updateGroupSubscription(allSubscribedTopics);
+                allSubscribedTopics.addAll(newlyAddedTopics);

Review comment:
       side fix: We can just add the `newlyAddedTopics` here because we already computed the it by `assignedTopics - allSubscribedTopics`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org