You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/02 16:52:05 UTC

[kafka] branch 2.5 updated: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2c30619  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
2c30619 is described below

commit 2c30619c43154b01a1b182755b7385c2394d04d5
Author: showuon <43...@users.noreply.github.com>
AuthorDate: Wed Jun 3 00:50:20 2020 +0800

    KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
    
    Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual method.
    
    We will create the consumerToOwnedPartitions to keep the set of previously owned partitions encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual, we'll get the member generation of the subscription, and remove all previously owned partitions as invalid if the current generation is higher. However, the logic before my fix, will remove the current highest member out of the consumerToOwnedPartitions, which should be kept because it's the curren [...]
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../clients/consumer/internals/AbstractStickyAssignor.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 353a225..9743688 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -121,6 +121,13 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
             if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
                 || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
 
+                // If the current member's generation is higher, all the previously owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+
                 membersOfCurrentHighestGeneration.add(consumer);
                 for (final TopicPartition tp : memberData.partitions) {
                     // filter out any topics that no longer exist or aren't part of the current subscription
@@ -128,13 +135,6 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
                         ownedPartitions.add(tp);
                     }
                 }
-
-                // If the current member's generation is higher, all the previous owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
             }
         }