You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/09 03:48:44 UTC

[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610324204



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
                                                                 Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+            partitionsPerTopic, consumerToOwnedPartitions));
+
         SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
         int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
         int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int numExpectedMaxCapacityMembers = unassignedPartitions.size() % numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMaxCapacityMembers = 0;
 
-        // initialize the assignment map with an empty array of size minQuota for all members
+        // initialize the assignment map with an empty array of size maxQuota for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));

Review comment:
       we should make the capacity to maxQuota to avoid memory reallocation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org