You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "philipnee (via GitHub)" <gi...@apache.org> on 2023/04/12 20:42:48 UTC

[GitHub] [kafka] philipnee opened a new pull request, #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

philipnee opened a new pull request, #13550:
URL: https://github.com/apache/kafka/pull/13550

   This is a really long story, but the incident started in KAFKA-13419 when we observed a member sending out a topic partition owned from the previous generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS.
   
   Ideally, the member should continue holding onto its partition as long as there's no other owner with a younger generation; however, we need to be defensive about this approach because we aren't sure if the partition has already been assigned to other members.  Therefore, it is safest for us to only honor the members with the highest generation and the previous generation during the assignment phase.
   
   In this PR, I made 2 major changes
   1. In the assignor: we now honor partition owner that is only on its max - 1 generation as long as there's no other owner with a younger generation to that partition. (younger = higher generationId)
   2. After getting REBALANCE_IN_PROGRESS sync group error, we immediately reset its generation so that we could ensure to claim lose for all of the owned partition if member doesn't re-join in timely member.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1169135808


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   Hey @dajac  thanks for the review. I think in case of the member misses a generation, we want to make sure the owned partitions are revoked (due to generation reset).  Regardlessly, it should still rejoin with its current partitions and should continue to hold on to its partition if it is only 1 generation behind. If it is 1+ generations behind, circle back to the beginning of my response, we want to make sure they are revoked because the partition might have already been reassigned.
   
   This makes me think that this will happen regularly in medium to large groups. -> I think this might not be as uncommon as what we think, especially with a large consumer group deployed to multiple pods, considering the pods can be staled before sending out syncGroup, while another consumer in a different pod tries to join the group.
   
   I hope i'm answering your questions there, I apologize if I misunderstood anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527244890

   Merged to trunk and 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527744080

   Thanks David, FWIW: I think we also want to back port it to 3.4, I'll raise a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170098942


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   Added to my previous comment, I think you will need to set the `needsOnJoinPrepare` to true to go through the revocation as pointed out here: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L821



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177783905


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -137,7 +136,7 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
         Map<TopicPartition, String> allPreviousPartitionsToOwner = new HashMap<>();
 
         for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
-            String consumer = subscriptionEntry.getKey();
+            final String consumer = subscriptionEntry.getKey();
             Subscription subscription = subscriptionEntry.getValue();

Review Comment:
   nit: While here, should this one be final as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   So my understanding is that partitions put in `partitionsWithMultiplePreviousOwners` will be unassigned from all consumers claiming them. Is my understanding correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));

Review Comment:
   nit: Should we define a variable for `memberData.generation.orElse(DEFAULT_GENERATION)`? The same code is reused later.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -42,10 +30,22 @@
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.ByteBuffer;

Review Comment:
   nit: Could we revert this as it is not related to the fix?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {

Review Comment:
   I actually wonder if this was expected or if it is just a typo in the test case because there is also `testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {

Review Comment:
   Aren't those two if statements the same? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertTrue(assignment.get(consumer1).containsAll(
+            Arrays.asList(tp(topic2, 1),
+                tp(topic3, 0),
+                tp(topic1, 2))));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic2, 0))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+

Review Comment:
   Should we also add a test for the case where a partition is claimed by two consumers with the same generation?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {

Review Comment:
   nit: It seems that we also test the case where a partition is claimed by two consumers with different generation in this test. Should we try to reflect this in its name?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertTrue(assignment.get(consumer1).containsAll(
+            Arrays.asList(tp(topic2, 1),
+                tp(topic3, 0),
+                tp(topic1, 2))));

Review Comment:
   nit: It may be better to verify the full assignment for completeness.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);
+                            }
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            continue;
+                        }
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
-                            log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
+                        if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);
                             consumerToOwnedPartitions.get(otherConsumer).remove(tp);
-                            partitionsWithMultiplePreviousOwners.add(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
                         }
+
+                        // if memberGeneration < otherMemberGeneration, the other member continue owns the generation
+                        log.warn("Found multiple members {} and {} claiming the same TopicPartition {} in " +
+                                "different generations. The topic partition wil be assigned to the member with " +
+                                "the higher generation {}.",

Review Comment:
   Should we add the generation for each members? This will also tell us which member got it in the end. I would also use `consumers` instead of `members` in order to be consistent with the other logs.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);
+                            }
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            continue;

Review Comment:
   This does not look nice. Should we remove it and use `else if`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1168638509


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   @philipnee I am trying to convince myself about this change. This basically means that whenever a member is late for the sync-group phase, it will abandon all its partitions. Here late means that the member sends the sync-group request after the next rebalance has started. I wonder how common this is, especially in large groups.
   
   My understanding is that all pending sync-group requests are completed when the leader sends the assignment. When they are completed, the members with partitions to be revoked calls revoke them and re-join more-or-less immediately (because we don't commit offsets in the cooperative mode, I think). This makes me think that this will happen regularly in medium to large groups.
   
   Could you elaborate a bit more on the reasoning behind this conservative change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1169901603


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   > Regardlessly, it should still rejoin with its current partitions and should continue to hold on to its partition if it is only 1 generation behind.
   
   Are you sure about this? It seems to me that whenever the generation is reset, all the partitions are lost and the member rejoins with no owned partitions. Am I wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm thinking is this: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should immediately re-send the join request. Are you thinking about the how client handles the illegal generation error? I think it is only being thrown during sync group and heartbeat. So I think by just resetting the generation shouldn't immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171780417


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    savePartitionAndGenerationState();

Review Comment:
   i think the ration for resetting the generation is to ensure member's partitions get revoke during the onJoinPrepare, i'm trying to think is there's a better way to do this...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165960472


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -153,43 +158,45 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
+            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration - 1
                 || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
 
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
-
-                membersOfCurrentHighestGeneration.add(consumer);
                 for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
                     if (allTopics.contains(tp.topic())) {
                         String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
                         if (otherConsumer == null) {
                             // this partition is not owned by other consumer in the same generation
                             ownedPartitions.add(tp);
                         } else {
-                            log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
-                            partitionsWithMultiplePreviousOwners.add(tp);
+                            int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                            int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                            if (memberGeneration == otherMemberGeneration) {
+                                if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                    log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                        consumer, otherConsumer, tp, maxGeneration);
+                                    partitionsWithMultiplePreviousOwners.add(tp);
+                                }
+                                consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                allPreviousPartitionsToOwner.put(tp, consumer);
+                                continue;
+                            }
+
+                            if (memberGeneration > otherMemberGeneration) {
+                                log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in " +

Review Comment:
   I feel... duplicated ownership should be a warning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1531331123

   Merged https://github.com/apache/kafka/pull/13652 to 3.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1522355547

   Hey @mimaison - Sorry for the late notice. We've been trying to fix this issue for the coming release 3.5.  Would it be possible to include this for the 3.5 release? We should be able to finish reviewing the changes this week. cc @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178034681


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {

Review Comment:
   Ok I renamed the test to `testEnsurePartitionsAssignedToHighestGeneration` as the goal of this test is to make sure partitions are always assigned to the member with the highest generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178764451


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {

Review Comment:
   nit: Could we put a comment in this branch like we did for the others?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
                             partitionsWithMultiplePreviousOwners.add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        } else if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            // if memberGeneration > otherMemberGeneration, the other member continue owns the generation
+                            log.warn("{} in generation {} and {} in generation {} claiming the same TopicPartition {} in " +

Review Comment:
   nit: I would say `Consumer {} in generation and consumer {} in ...`. This applies to the other comment as well.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,99 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        // ensure partitions are always assigned to the member with the highest generation
+        // topic, 1 -> [consumer2], consumer3
+        // topic2, 1 -> [consumer2], consumer3
+        // topic, 0 -> [consumer1], consumer3
+        // topic3, 2 -> consumer3
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic3, 0), tp(topic3, 2), tp(topic2, 1)), currentGeneration - 2, 1));

Review Comment:
   nit: Could we put `tp(topic2, 1)` before `topic3`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,99 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        // ensure partitions are always assigned to the member with the highest generation
+        // topic, 1 -> [consumer2], consumer3
+        // topic2, 1 -> [consumer2], consumer3
+        // topic, 0 -> [consumer1], consumer3
+        // topic3, 2 -> consumer3
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic3, 0), tp(topic3, 2), tp(topic2, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));

Review Comment:
   nit: Could we put `topic1` after `topic`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
                             partitionsWithMultiplePreviousOwners.add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        } else if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            // if memberGeneration > otherMemberGeneration, the other member continue owns the generation
+                            log.warn("{} in generation {} and {} in generation {} claiming the same TopicPartition {} in " +
+                                            "different generations. The topic partition wil be assigned to the member with " +
+                                            "the higher generation {}.",
+                                    consumer, memberGeneration,
+                                    otherConsumer, otherMemberGeneration,
+                                    tp,
+                                    memberGeneration);
+                        } else {
+                            // if memberGeneration < otherMemberGeneration, the other member continue owns the generation

Review Comment:
   nit: Should we remove `if memberGeneration < otherMemberGeneration` as it is implicit?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,99 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        // ensure partitions are always assigned to the member with the highest generation
+        // topic, 1 -> [consumer2], consumer3
+        // topic2, 1 -> [consumer2], consumer3
+        // topic, 0 -> [consumer1], consumer3
+        // topic3, 2 -> consumer3

Review Comment:
   I don't understand what you are trying to express here. It also seems to me that the mapping is incorrect. Should we just remove this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
                             partitionsWithMultiplePreviousOwners.add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        } else if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            // if memberGeneration > otherMemberGeneration, the other member continue owns the generation

Review Comment:
   nit: Should we just remove this one? It seems that the comment earlier explain it all.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,99 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        // ensure partitions are always assigned to the member with the highest generation
+        // topic, 1 -> [consumer2], consumer3
+        // topic2, 1 -> [consumer2], consumer3
+        // topic, 0 -> [consumer1], consumer3
+        // topic3, 2 -> consumer3
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic3, 0), tp(topic3, 2), tp(topic2, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),

Review Comment:
   nit: Could we put `partitions(),` on the next line to follow the pattern used by the others? It is easier to read and compare them.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
                             partitionsWithMultiplePreviousOwners.add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        } else if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);

Review Comment:
   nit: We could use `ownedPartitions.add(tp)`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,99 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        // ensure partitions are always assigned to the member with the highest generation
+        // topic, 1 -> [consumer2], consumer3
+        // topic2, 1 -> [consumer2], consumer3
+        // topic, 0 -> [consumer1], consumer3
+        // topic3, 2 -> consumer3
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic3, 0), tp(topic3, 2), tp(topic2, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertEquals(new HashSet<>(assignment.get(consumer1)),
+                new HashSet<>(partitions(tp(topic2, 1), tp(topic3, 0), tp(topic1, 2))));

Review Comment:
   nit: We should put the expected one first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170094810


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   Hmm, I think the partitions are only lost during the onJoinPrepare, what I'm thinking is this: https://github.com/apache/kafka/blob/61530d68ce83467de6190a52da37b3c0af84f0ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L507-L511
   
   The gist is: on one of the 4 exception thrown in join/sync group, it should immediately re-send the join request. Are you thinking about the how client handles the illegal generation error? I think it is only being thrown during sync group and heartbeat. So I think by just resetting the generation shouldn't immediately causes revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1170599794


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   Offline discussion with @hachikuji  - it seems like what we want is to revoke the old partitions but resend these partitions on the subsequent join. RN, join only sends out the assigned partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1525753054

   @dajac - Thanks again for the review. I've addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177893519


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   that seems like the case, reference to the snippet here:
   ```
   for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) {
                       if (ownedPartitions.contains(doublyClaimedPartition)) {
                           log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple "
                                           + "consumers already in the same generation. Removing it from the ownedPartitions",
                                   doublyClaimedPartition, consumer);
                           ownedPartitions.remove(doublyClaimedPartition);
                       }           
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1178121430


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);

Review Comment:
   From KAFKA-12984: 
   ```
   ...the assignor will now explicitly look out for partitions that are being claimed by multiple consumers ... we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165959573


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -153,43 +158,46 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
+            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration - 1
                 || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
 
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {

Review Comment:
   we don't need this because the maxGeneration has already been computed previously, so we know the set of valid members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177895473


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,51 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            maxGeneration = Math.max(maxGeneration, memberData.generation.orElse(DEFAULT_GENERATION));
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
+
+                        if (memberGeneration == otherMemberGeneration) {
+                            if (subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION) == memberData.generation.orElse(DEFAULT_GENERATION)) {
+                                log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+                                        + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
+                                partitionsWithMultiplePreviousOwners.add(tp);
+                            }
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            continue;

Review Comment:
   It could be.  I got into the habit of returning early, I thought it makes it easier to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177937393


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -42,10 +30,22 @@
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.ByteBuffer;

Review Comment:
   oh yes, will do it.... thanks for my IDE's import optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980400


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    savePartitionAndGenerationState();

Review Comment:
   This is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177934535


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertTrue(assignment.get(consumer1).containsAll(
+            Arrays.asList(tp(topic2, 1),
+                tp(topic3, 0),
+                tp(topic1, 2))));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic2, 0))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+

Review Comment:
   good idea, I didn't check because i thought there could be an existing test case covering it but let me check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13550:
URL: https://github.com/apache/kafka/pull/13550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1165947809


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.internals.Utils.PartitionComparator;

Review Comment:
   this is "optimized" by the IDE 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1176980677


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +835,7 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    resetStateAndGeneration("member missed the rebalance", true);

Review Comment:
   this is discarded in the latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171712519


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    savePartitionAndGenerationState();

Review Comment:
   saving the generation and partition state prior to resetting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1171774551


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    savePartitionAndGenerationState();

Review Comment:
   couldn’t we just rejoin without reseting the generation? in other words, what is the advantage of resetting the generation here if we still want to use it to re-join afterwards?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1172072549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -835,6 +839,8 @@ public void handle(SyncGroupResponse syncResponse,
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    savePartitionAndGenerationState();

Review Comment:
   I'm not sure what's the better way to invoke `onPartitionsLost` w/o resetting the generation. The alternative is to use a flag, but that's rather unclean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1522983087

   @philipnee Yes, we can backport this to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1177936654


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##########
@@ -1038,6 +1038,96 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreStableForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
+            partitions(tp(topic2, 2), tp(topic3, 2), tp(topic, 1)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
+            new HashSet<>(assignment.get(consumer3)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+        partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));
+        partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1), partitions(),
+            DEFAULT_GENERATION, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
+        subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
+        subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
+            partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        // ensure assigned partitions don't get reassigned
+        assertTrue(assignment.get(consumer1).containsAll(
+            Arrays.asList(tp(topic2, 1),
+                tp(topic3, 0),
+                tp(topic1, 2))));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
+    @EnumSource(RackConfig.class)
+    public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) {
+        initializeRacks(rackConfig);
+        Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, partitionInfos(topic, 3));
+        partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
+
+        int currentGeneration = 10;
+
+        subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0));
+        subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2),
+            partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
+        assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1))),
+            new HashSet<>(assignment.get(consumer1)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic2, 0))),
+            new HashSet<>(assignment.get(consumer2)));
+        assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+

Review Comment:
   ok `testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMultipleConsumersInSameGeneration` might cover it.  But I'll double check if that's what we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13550:
URL: https://github.com/apache/kafka/pull/13550#discussion_r1179185376


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##########
@@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
             }
 
             MemberData memberData = memberData(subscription);
+            final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
+            maxGeneration = Math.max(maxGeneration, memberGeneration);
 
             List<TopicPartition> ownedPartitions = new ArrayList<>();
             consumerToOwnedPartitions.put(consumer, ownedPartitions);
 
-            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
-            // generation, or it's generation is not present but we have not seen any known generation so far
-            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
-                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
-
-                // If the current member's generation is higher, all the previously owned partitions are invalid
-                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
-                    allPreviousPartitionsToOwner.clear();
-                    partitionsWithMultiplePreviousOwners.clear();
-                    for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
-                        consumerToOwnedPartitions.get(droppedOutConsumer).clear();
-                    }
-
-                    membersOfCurrentHighestGeneration.clear();
-                    maxGeneration = memberData.generation.get();
-                }
+            // the member has a valid generation, so we can consider its owned partitions if it has the highest
+            // generation amongst
+            for (final TopicPartition tp : memberData.partitions) {
+                if (allTopics.contains(tp.topic())) {
+                    String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
+                    if (otherConsumer == null) {
+                        // this partition is not owned by other consumer in the same generation
+                        ownedPartitions.add(tp);
+                    } else {
+                        final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);
 
-                membersOfCurrentHighestGeneration.add(consumer);
-                for (final TopicPartition tp : memberData.partitions) {
-                    // filter out any topics that no longer exist or aren't part of the current subscription
-                    if (allTopics.contains(tp.topic())) {
-                        String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
-                        if (otherConsumer == null) {
-                            // this partition is not owned by other consumer in the same generation
-                            ownedPartitions.add(tp);
-                        } else {
+                        if (memberGeneration == otherMemberGeneration) {
                             log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
-                                + "same generation {}, this will be invalidated and removed from their previous assignment.",
-                                     consumer, otherConsumer, tp, maxGeneration);
-                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                                            + "same generation {}, this will be invalidated and removed from their previous assignment.",
+                                    consumer, otherConsumer, tp, memberGeneration);
                             partitionsWithMultiplePreviousOwners.add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                        } else if (memberGeneration > otherMemberGeneration) {
+                            // move partition from the member with an older generation to the member with the newer generation
+                            consumerToOwnedPartitions.get(consumer).add(tp);
+                            consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+                            allPreviousPartitionsToOwner.put(tp, consumer);
+                            // if memberGeneration > otherMemberGeneration, the other member continue owns the generation
+                            log.warn("{} in generation {} and {} in generation {} claiming the same TopicPartition {} in " +
+                                            "different generations. The topic partition wil be assigned to the member with " +
+                                            "the higher generation {}.",
+                                    consumer, memberGeneration,
+                                    otherConsumer, otherMemberGeneration,
+                                    tp,
+                                    memberGeneration);
+                        } else {
+                            // if memberGeneration < otherMemberGeneration, the other member continue owns the generation

Review Comment:
   thanks, there's also a typo: continue to own the "topic partition"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org