You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/02 16:52:05 UTC
[kafka] branch 2.5 updated: KAFKA-10082: Fix the failed
testMultiConsumerStickyAssignment (#8777)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 2c30619 KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
2c30619 is described below
commit 2c30619c43154b01a1b182755b7385c2394d04d5
Author: showuon <43...@users.noreply.github.com>
AuthorDate: Wed Jun 3 00:50:20 2020 +0800
KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual method.
We will create the consumerToOwnedPartitions to keep the set of previously owned partitions encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual, we'll get the member generation of the subscription, and remove all previously owned partitions as invalid if the current generation is higher. However, the logic before my fix, will remove the current highest member out of the consumerToOwnedPartitions, which should be kept because it's the curren [...]
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../clients/consumer/internals/AbstractStickyAssignor.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 353a225..9743688 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -121,6 +121,13 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
+ // If the current member's generation is higher, all the previously owned partitions are invalid
+ if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+ membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+ membersOfCurrentHighestGeneration.clear();
+ maxGeneration = memberData.generation.get();
+ }
+
membersOfCurrentHighestGeneration.add(consumer);
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
@@ -128,13 +135,6 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
ownedPartitions.add(tp);
}
}
-
- // If the current member's generation is higher, all the previous owned partitions are invalid
- if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
- membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
- membersOfCurrentHighestGeneration.clear();
- maxGeneration = memberData.generation.get();
- }
}
}