You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/14 18:46:30 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

ableegoldman opened a new pull request #8668:
URL: https://github.com/apache/kafka/pull/8668


   Motivation and pseudo code algorithm in the ticket.
   
   [WIP] still need to finish writing tests and gather rough benchmark results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r434021704



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +72,206 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            partitionsTransferringOwnership = new HashMap<>();
+            return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            partitionsTransferringOwnership = null;
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    /**
+     * Returns true iff all consumers have an identical subscription. Also fills out the passed in
+     * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions
+     */
+    private boolean allSubscriptionsEqual(Set<String> allTopics,
+                                          Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = DEFAULT_GENERATION;
+
+        Set<String> subscribedTopics = new HashSet<>();
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            List<TopicPartition> ownedPartitions = new ArrayList<>();
+            consumerToOwnedPartitions.put(consumer, ownedPartitions);
+
+            // Only consider this consumer's owned partitions as valid if it is a member of the current highest
+            // generation, or it's generation is not present but we have not seen any known generation so far
+            if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
+                || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {
+
+                membersOfCurrentHighestGeneration.add(consumer);
+                for (final TopicPartition tp : memberData.partitions) {
+                    // filter out any topics that no longer exist or aren't part of the current subscription
+                    if (allTopics.contains(tp.topic())) {
+                        ownedPartitions.add(tp);
+                    }
+                }
+
+                // If the current member's generation is higher, all the previous owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();

Review comment:
       Just FYI, I introduced this bug right before merging. Luckily the tests caught it -- fix is https://github.com/apache/kafka/pull/8777




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432246028



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##########
@@ -425,8 +422,36 @@ public void testSameSubscriptions() {
         assertTrue(assignor.isSticky());
     }
 
+    @Test(timeout = 30 * 1000)
+    public void testLargeAssignmentAndGroupWithUniformSubscription() {
+        int topicCount = 200;

Review comment:
       On trunk, this test fails (hits the 30s timeout) even when you reduce the number of topics to just 1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432683840



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());

Review comment:
       nit: to be consistent, we can just add `consumer` to `membersWithOldGeneration` and then let them to be cleared at the end.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());
+            } else {
+                // If the current member's generation is higher, all the previous owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+                membersOfCurrentHighestGeneration.add(consumer);
+                List<TopicPartition> ownedPartitions = memberData.partitions.stream()
+                    .filter(tp -> subscribedTopics.contains(tp.topic()))
+                    .collect(Collectors.toList());
+                consumerToOwnedPartitions.put(consumer, ownedPartitions);
+            }
+        }
+
+        for (String consumer : membersWithOldGeneration) {
+            consumerToOwnedPartitions.get(consumer).clear();
+        }
+        return true;
+    }
+
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
+                                                                Set<String> subscribedTopics,
+                                                                Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic, subscribedTopics);
+
+        // Each consumer should end up in exactly one of the below
+        List<String> unfilledMembers = new LinkedList<>();
+        Queue<String> maxCapacityMembers = new LinkedList<>();
+        Queue<String> minCapacityMembers = new LinkedList<>();
+
+        int numberOfConsumers = consumerToOwnedPartitions.size();
+        int minQuota = (int) Math.floor(((double)unassignedPartitions.size()) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double)unassignedPartitions.size()) / numberOfConsumers);
+
+        // initialize the assignment map with an empty array of size minQuota for all members
+        Map<String, List<TopicPartition>> assignment = new HashMap<>(
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+
+        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
+            String consumer = consumerEntry.getKey();
+            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
+
+            if (ownedPartitions.size() < minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() == minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                minCapacityMembers.add(consumer);
+            } else {
+                List<TopicPartition> assignmentPartitions = assignment.get(consumer);
+                Iterator<TopicPartition> ownedPartitionsIter = ownedPartitions.iterator();
+                for (int i = 0; i < maxQuota; ++i) {
+                    TopicPartition tp = ownedPartitionsIter.next();
+                    assignmentPartitions.add(tp);
+                    unassignedPartitions.remove(tp);
+                }
+                maxCapacityMembers.add(consumer);
+            }
+        }
+
+        Collections.sort(unfilledMembers);
+        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
+
+        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
+            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+
+            while (unfilledConsumerIter.hasNext()) {
+                String consumer = unfilledConsumerIter.next();
+                List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+                if (unassignedPartitionsIter.hasNext()) {
+                    consumerAssignment.add(unassignedPartitionsIter.next());
+                    unassignedPartitionsIter.remove();
+                } else {
+                    break;
+                }
+
+                if (consumerAssignment.size() == minQuota) {
+                    minCapacityMembers.add(consumer);
+                    unfilledConsumerIter.remove();
+                }
+            }
+        }
+
+        // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
+        // from the over-full consumers at max capacity
+        for (String consumer : unfilledMembers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int remainingCapacity = minQuota - consumerAssignment.size();
+            while (remainingCapacity > 0) {

Review comment:
       NVM, I realized it should never happen.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());
+            } else {
+                // If the current member's generation is higher, all the previous owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+                membersOfCurrentHighestGeneration.add(consumer);
+                List<TopicPartition> ownedPartitions = memberData.partitions.stream()
+                    .filter(tp -> subscribedTopics.contains(tp.topic()))
+                    .collect(Collectors.toList());
+                consumerToOwnedPartitions.put(consumer, ownedPartitions);
+            }
+        }
+
+        for (String consumer : membersWithOldGeneration) {
+            consumerToOwnedPartitions.get(consumer).clear();
+        }
+        return true;
+    }
+
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
+                                                                Set<String> subscribedTopics,
+                                                                Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic, subscribedTopics);
+
+        // Each consumer should end up in exactly one of the below
+        List<String> unfilledMembers = new LinkedList<>();
+        Queue<String> maxCapacityMembers = new LinkedList<>();
+        Queue<String> minCapacityMembers = new LinkedList<>();
+
+        int numberOfConsumers = consumerToOwnedPartitions.size();
+        int minQuota = (int) Math.floor(((double)unassignedPartitions.size()) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double)unassignedPartitions.size()) / numberOfConsumers);
+
+        // initialize the assignment map with an empty array of size minQuota for all members
+        Map<String, List<TopicPartition>> assignment = new HashMap<>(
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+
+        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
+            String consumer = consumerEntry.getKey();
+            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
+
+            if (ownedPartitions.size() < minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() == minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                minCapacityMembers.add(consumer);
+            } else {
+                List<TopicPartition> assignmentPartitions = assignment.get(consumer);
+                Iterator<TopicPartition> ownedPartitionsIter = ownedPartitions.iterator();
+                for (int i = 0; i < maxQuota; ++i) {
+                    TopicPartition tp = ownedPartitionsIter.next();
+                    assignmentPartitions.add(tp);
+                    unassignedPartitions.remove(tp);
+                }
+                maxCapacityMembers.add(consumer);
+            }
+        }
+
+        Collections.sort(unfilledMembers);
+        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
+
+        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
+            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+
+            while (unfilledConsumerIter.hasNext()) {
+                String consumer = unfilledConsumerIter.next();
+                List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+                if (unassignedPartitionsIter.hasNext()) {
+                    consumerAssignment.add(unassignedPartitionsIter.next());
+                    unassignedPartitionsIter.remove();
+                } else {
+                    break;
+                }
+
+                if (consumerAssignment.size() == minQuota) {
+                    minCapacityMembers.add(consumer);
+                    unfilledConsumerIter.remove();
+                }
+            }
+        }
+
+        // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
+        // from the over-full consumers at max capacity
+        for (String consumer : unfilledMembers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int remainingCapacity = minQuota - consumerAssignment.size();
+            while (remainingCapacity > 0) {

Review comment:
       Is it possible that this unfilled consumer has N+1 remaining capacity, while there's only N max consumer only?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432730192



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());

Review comment:
       Hm, it seems odd to clear it at the end since it's definitely already empty. Note, we're not overwriting the current partitions with an empty array, we're just initializing the assignment for this consumer. I'll add a comment though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432200092



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -303,79 +469,17 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) {
                                                 Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
         List<TopicPartition> sortedPartitions = new ArrayList<>();
 
-        if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) {

Review comment:
       We can remove all this since we checked for identical subscriptions at the start, so we know that they are not




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-636264014


   @guozhangwang made a few more changes, ready for another review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637591555


   Did we check the build before merging this? It seems to have broken it:
   https://github.com/apache/kafka/pull/8779


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-635750374


   call for review any of @guozhangwang @hachikuji @vvcephei 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432727979



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8668:
URL: https://github.com/apache/kafka/pull/8668


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432802206



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +72,206 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            partitionsTransferringOwnership = new HashMap<>();
+            return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            partitionsTransferringOwnership = null;

Review comment:
       I didn't bother to include this optimization for the general case. We know that the assignment algorithm itself becomes a bottleneck at only 2,000 partitions, so there's no point optimizing something that only becomes a bottleneck in the millions of partitions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637154355


   @twmb yeah, I should point out in the ticket that this approach drops the optimization giving preference to older-generation owners of a partition. I actually don't think it would be particularly difficult to incorporate into this new algorithm, but my take was that it still adds more complexity than any benefit it provides.
   We actually dropped this implicitly in the cooperative assignor, since a member with an older generation will have to give up all of its owned partitions before rejoining the group anyway.
   
   It was nice to be able to build up the `partitionsTransferringOwnership` with the additional context we have while crafting the assignment, but to be fair it may be somewhat of an over-optimization at this point. The `adjustAssignment` loop that was building it up from scratch still performed fine up to ~5-10 million partitions. But I figure, better to optimize now and not have to worry about it later 🙂 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma edited a comment on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ijuma edited a comment on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541


   @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally also build locally when cherry-picking.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432230560



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
##########
@@ -169,10 +169,10 @@ public void testAssignmentWithConflictingPreviousGenerations() {
         TopicPartition tp5 = new TopicPartition(topic, 5);
 
         List<TopicPartition> c1partitions0 = partitions(tp0, tp1, tp4);
-        List<TopicPartition> c2partitions0 = partitions(tp0, tp2, tp3);
+        List<TopicPartition> c2partitions0 = partitions(tp0, tp1, tp2);

Review comment:
       This test was testing an illegal state to begin with: you should never have two consumers in the same generation claim to own the same partition. That fact is the entire reason for the generation field to be added to the StickyAssignor's subscription userdata to begin with.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541


   @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally build locally when cherry-picking.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432802106



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -40,7 +43,11 @@
 
     public static final int DEFAULT_GENERATION = -1;
 
-    private PartitionMovements partitionMovements;
+    private PartitionMovements partitionMovements = new PartitionMovements();
+
+    // Keep track of the partitions being migrated from one consumer to another during assignment
+    // so the cooperative assignor can adjust the assignment
+    protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>();

Review comment:
       This is just an optimization for the cooperative case: I found that the assignment time for the eager and cooperative assignor began to diverge once you reached partition counts in the millions. At 10 million partitions for example, the eager assignor hovered around 30s but the cooperative assignor was upwards of 5-6 minutes.
   The discrepancy was entirely due to the `adjustAssignment` method needing to compute the set of partitions transferring ownership  in the completed assignment. But we can build up this map during assignment much more efficiently, by taking advantage of the additional context we have at various steps in the algorithm. Tracking and exposing this set to the cooperative assignor cut the assignment time for large partition numbers pretty drastically, putting the cooperative assignor  on par with the eager assignor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637779000


   @ableegoldman No worries, I do the same. We just need to check the PR result before merging. Additionally, committers should run checkstyle and spotBugs when cherry-picking to older branches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] twmb commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
twmb commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637026123


   From what I can tell, this looks good to me. This loses one mostly insignificant "optimization" that does not really affect anything in reality: prior, if an old-generation member is rejoining, the code would try to re-sticky partitions to those old members for any partitions that are now on overloaded members or are unassigned. This is a pretty minor optimization though, and deleting this logic entirely from my own balancer breaks no tests.
   
   This algorithm primarily differs from mine by doing a bunch of up front checking work, and then doing a "single" pass that performs all assignments. Mine does a bunch of assigning while doing checks, and then does a small balancing pass. Both of these options are great, though!
   
   Pretty nifty observation about building `partitionsTransferringOwnership` while doing assignment. I'm going to have to figure out if that's even possible with my approach--your algorithm can do that because of its one pass, whereas mine loses some context of who started with what by the time it gets to balancing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637672028


   Sorry @ijuma, I think I only ever ran the local tests + checkstyle, not the full suite. My mistake


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637175464


   Cherry-picked to 2.6/2.5/2.4.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432802669



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##########
@@ -105,12 +106,16 @@ public void testOnlyAssignsPartitionsFromSubscribedTopics() {
         String otherTopic = "other";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        partitionsPerTopic.put(topic, 3);
-        partitionsPerTopic.put(otherTopic, 3);
-        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));

Review comment:
       This test was also starting with an illegal state -- `partitionsPerTopic` only contains metadata for topics included in the subscription. I noticed that we don't seem to be testing the actual valid case, where some consumers have `ownedPartitions` which are no longer in the subscription, so I just adapted this test for the related purpose




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432231137



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##########
@@ -582,35 +578,6 @@ public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
         assertTrue(assignment.get(consumerId).isEmpty());
     }
 
-    @Test
-    public void testConflictingPreviousAssignments() {

Review comment:
       See comment above: this test was starting from an illegal state. Also, it doesn't make sense to place this in the AbstractStickyAssignorTest as the cooperative assignor can't have conflicting previous assignments. If a member thinks it still owns a partition that now belongs to another member, it will have to invoke `onPartitionsLost` before rejoining the group 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-636103113


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org