You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/31 02:48:56 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

showuon commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r680294663



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       Small optimization here since if the partition size is huge, this check will have some cost for performance. 
   
   From the java doc in `Map#put`  
   > Returns:
   the previous value associated with key, or null if there was no mapping for key. (A null return can also indicate that the map previously associated null with key, if the implementation supports null values.)
   
   That is, when put(insert) element into `Map`, we will do a key search, and then insert, and return `null`(not found) or previous value. So we can "remove the `containsKey` check" here, and use the return value to check if the key element had a value of not, because we'll never put `null` as the consumer value. 
   
   There will be 1 case that will be a little different than before:
    - before: if there is a value for the key, we won't put the new value(consumer) into `allPreviousPartitionsToOwner`
    - after: we'll always put the new value(consumer) into `allPreviousPartitionsToOwner`. 
    
   It won't impact the result because 
     1. what we want to identify from `allPreviousPartitionsToOwner`, is if the partition was also owned by others. That is, we don't care it was owned by consumer A, or B, or C. 
     2. We'll do `consumerToOwnedPartitions.get(otherConsumer).remove(tp);` to remove the partition from the other parition owner. After this change, if there are 3 consumers owned the same partition, we'll do:
        a. consumer A: `allPreviousPartitionsToOwner.put(tp, "A")`, and also add into its `ownedPartitions`
        b. consumer B: `allPreviousPartitionsToOwner.put(tp, "B")`, and return "A" from the `put` method, found it was also owned by A, remove the partition from A's ownedPartition: `consumerToOwnedPartitions.get("A").remove(tp);`
        c. consumer C: `allPreviousPartitionsToOwner.put(tp, "C")`, and return "B" from the `put` method, found it was also owned by B, remove the partition from B's ownedPartition: `consumerToOwnedPartitions.get("B").remove(tp);`. this remove will return false since it can't find this partition in B's owned partition, which is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org