You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/31 02:00:05 UTC

[GitHub] [kafka] showuon opened a new pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

showuon opened a new pull request #11156:
URL: https://github.com/apache/kafka/pull/11156


   1. Add tests for `partitionsTransferringOwnership`, that should include both revoked partitions and partitions claimed by multiple consumers. For non-equal assignment case (general case), it should be null so that the cooperative assignor knows to compute it from scratch
   2. small optimization for `allPreviousPartitionsToOwner` check.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11156:
URL: https://github.com/apache/kafka/pull/11156


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r680294663



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       Small optimization here since if the partition size is huge, and we need to do this check for each partition in each consumer, this will have some cost for performance. 
   
   From the java doc in `Map#put`  
   > Returns:
   the previous value associated with key, or null if there was no mapping for key. (A null return can also indicate that the map previously associated null with key, if the implementation supports null values.)
   
   That is, when put(insert) element into `Map`, we will do a key search, and then insert, and return `null`(not found) or previous value. So we can "remove the `containsKey` check" here, and use the return value to check if the key element had a value of not, because we'll never put `null` as the consumer value. 
   
   After all, we can imagine, 99% cases will be normal case (i.e. 1 partition with only 1 owner). So originally:
     - **multiple consumers claiming same partition case**: we'll do `containsKey` check only
     - **normal case**: we'll do both `containsKey` and `put`
   
   After this change, we'll optimize for the `normal case`, which directly `put`, and have a lazy check for the key, by the return value from `put`.
   
   
   
   Note: there will be 1 case that will be a little different than before:
    - before: if there is a value for the key, we won't put the new value(consumer) into `allPreviousPartitionsToOwner`
    - after: we'll always put the new value(consumer) into `allPreviousPartitionsToOwner`. 
    
   It won't impact the result because 
     1. what we want to identify from `allPreviousPartitionsToOwner`, is if the partition was also owned by others. That is, we don't care it was owned by consumer A, or B, or C. 
     2. We'll do `consumerToOwnedPartitions.get(otherConsumer).remove(tp);` to remove the partition from the other parition owner. After this change, if there are 3 consumers owned the same partition, we'll do:
        a. consumer A: `allPreviousPartitionsToOwner.put(tp, "A")`, and also add into its `ownedPartitions`
        b. consumer B: `allPreviousPartitionsToOwner.put(tp, "B")`, and return "A" from the `put` method, found it was also owned by A, remove the partition from A's ownedPartition: `consumerToOwnedPartitions.get("A").remove(tp);`
        c. consumer C: `allPreviousPartitionsToOwner.put(tp, "C")`, and return "B" from the `put` method, found it was also owned by B, remove the partition from B's ownedPartition: `consumerToOwnedPartitions.get("B").remove(tp);`. this remove will return false since it can't find this partition in B's owned partition, which is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r683763243



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       New logic makes sense to me 👍 -- I agree it doesn't really matter if we overwrite the previous consumer in the map since the thing we really care about is whether or not there was one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r680294663



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       Small optimization here since if the partition size is huge, this check will have some cost for performance. 
   
   From the java doc in `Map#put`  
   > Returns:
   the previous value associated with key, or null if there was no mapping for key. (A null return can also indicate that the map previously associated null with key, if the implementation supports null values.)
   
   That is, when put(insert) element into `Map`, we will do a key search, and then insert, and return `null`(not found) or previous value. So we can "remove the `containsKey` check" here, and use the return value to check if the key element had a value of not, because we'll never put `null` as the consumer value. 
   
   There will be 1 case that will be a little different than before:
    - before: if there is a value for the key, we won't put the new value(consumer) into `allPreviousPartitionsToOwner`
    - after: we'll always put the new value(consumer) into `allPreviousPartitionsToOwner`. 
    
   It won't impact the result because 
     1. what we want to identify from `allPreviousPartitionsToOwner`, is if the partition was also owned by others. That is, we don't care it was owned by consumer A, or B, or C. 
     2. We'll do `consumerToOwnedPartitions.get(otherConsumer).remove(tp);` to remove the partition from the other parition owner. After this change, if there are 3 consumers owned the same partition, we'll do:
        a. consumer A: `allPreviousPartitionsToOwner.put(tp, "A")`, and also add into its `ownedPartitions`
        b. consumer B: `allPreviousPartitionsToOwner.put(tp, "B")`, and return "A" from the `put` method, found it was also owned by A, remove the partition from A's ownedPartition: `consumerToOwnedPartitions.get("A").remove(tp);`
        c. consumer C: `allPreviousPartitionsToOwner.put(tp, "C")`, and return "B" from the `put` method, found it was also owned by B, remove the partition from B's ownedPartition: `consumerToOwnedPartitions.get("B").remove(tp);`. this remove will return false since it can't find this partition in B's owned partition, which is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#issuecomment-890300247


   @ableegoldman @guozhangwang , please help review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r680294663



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       Small optimization here since if the partition size is huge, and we need to do this check for each partition in each consumer, this will have some cost for performance. 
   
   From the java doc in `Map#put`  
   > Returns:
   the previous value associated with key, or null if there was no mapping for key. (A null return can also indicate that the map previously associated null with key, if the implementation supports null values.)
   
   That is, when put(insert) element into `Map`, we will do a key search, and then insert, and return `null`(not found) or previous value. So we can "remove the `containsKey` check" here, and use the return value to check if the key element had a value of not, because we'll never put `null` as the consumer value. 
   
   After all, we can imagine, 99% cases will be normal case (i.e. 1 partition with only 1 owner). So originally:
     - **multiple consumers claiming same partition case**: we'll do `containsKey` check only
     - **normal case**: we'll do both `containsKey` and `put`
   
   After this change, we'll optimize for the `normal case`, which directly `put`, and have a lazy check for the key, by the return value from `put`.
   
   
   
   Note: there will be 1 case that will be a little different than before:
    - before: if there is a value for the key, we won't put the new value(consumer) into `allPreviousPartitionsToOwner`
    - after: we'll always put the new value(consumer) into `allPreviousPartitionsToOwner`. 
    
   It won't impact the result because 
     1. what we want to identify from `allPreviousPartitionsToOwner`, is if the partition was also owned by others. That is, we don't care it was owned by consumer A, or B, or C. 
     2. We'll do `consumerToOwnedPartitions.get(otherConsumer).remove(tp);` to remove the partition from the other parition owner. After this change, if there are 3 consumers owned the same partition, we'll do:
        a. **consumer A**: `allPreviousPartitionsToOwner.put(tp, "A")`, and also add into its `ownedPartitions`
        b. **consumer B**: `allPreviousPartitionsToOwner.put(tp, "B")`, and return "A" from the `put` method, found it was also owned by A, remove the partition from A's ownedPartition: `consumerToOwnedPartitions.get("A").remove(tp);`
        c. **consumer C**: `allPreviousPartitionsToOwner.put(tp, "C")`, and return "B" from the `put` method, found it was also owned by B, remove the partition from B's ownedPartition: `consumerToOwnedPartitions.get("B").remove(tp);`. this remove will return false since it can't find this partition in B's owned partition, which is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11156: KAFKA-13046: add test coverage for AbstractStickyAssignorTest

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11156:
URL: https://github.com/apache/kafka/pull/11156#discussion_r680294663



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -142,12 +142,11 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
-
-                        if (!allPreviousPartitionsToOwner.containsKey(tp)) {
-                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                        if (otherConsumer == null) {
+                            // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            String otherConsumer = allPreviousPartitionsToOwner.get(tp);
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
                                 + "same generation {}, this will be invalidated and removed from their previous assignment.",
                                      consumer, otherConsumer, tp, maxGeneration);

Review comment:
       Small optimization here since if the partition size is huge, and we need to do this check for each partition in each consumer, this will have some cost for performance. 
   
   From the java doc in `Map#put`  
   > Returns:
   the previous value associated with key, or null if there was no mapping for key. (A null return can also indicate that the map previously associated null with key, if the implementation supports null values.)
   
   That is, when put(insert) element into `Map`, we will do a key search, and then insert, and return `null`(not found) or previous value. So we can "remove the `containsKey` check" here, and use the return value to check if the key element had a value of not, because we'll never put `null` as the consumer value. 
   
   There will be 1 case that will be a little different than before:
    - before: if there is a value for the key, we won't put the new value(consumer) into `allPreviousPartitionsToOwner`
    - after: we'll always put the new value(consumer) into `allPreviousPartitionsToOwner`. 
    
   It won't impact the result because 
     1. what we want to identify from `allPreviousPartitionsToOwner`, is if the partition was also owned by others. That is, we don't care it was owned by consumer A, or B, or C. 
     2. We'll do `consumerToOwnedPartitions.get(otherConsumer).remove(tp);` to remove the partition from the other parition owner. After this change, if there are 3 consumers owned the same partition, we'll do:
        a. consumer A: `allPreviousPartitionsToOwner.put(tp, "A")`, and also add into its `ownedPartitions`
        b. consumer B: `allPreviousPartitionsToOwner.put(tp, "B")`, and return "A" from the `put` method, found it was also owned by A, remove the partition from A's ownedPartition: `consumerToOwnedPartitions.get("A").remove(tp);`
        c. consumer C: `allPreviousPartitionsToOwner.put(tp, "C")`, and return "B" from the `put` method, found it was also owned by B, remove the partition from B's ownedPartition: `consumerToOwnedPartitions.get("B").remove(tp);`. this remove will return false since it can't find this partition in B's owned partition, which is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org