You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/29 19:31:36 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

guozhangwang commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r432683840



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());

Review comment:
       nit: to be consistent, we can just add `consumer` to `membersWithOldGeneration` and then let them to be cleared at the end.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());
+            } else {
+                // If the current member's generation is higher, all the previous owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+                membersOfCurrentHighestGeneration.add(consumer);
+                List<TopicPartition> ownedPartitions = memberData.partitions.stream()
+                    .filter(tp -> subscribedTopics.contains(tp.topic()))
+                    .collect(Collectors.toList());
+                consumerToOwnedPartitions.put(consumer, ownedPartitions);
+            }
+        }
+
+        for (String consumer : membersWithOldGeneration) {
+            consumerToOwnedPartitions.get(consumer).clear();
+        }
+        return true;
+    }
+
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
+                                                                Set<String> subscribedTopics,
+                                                                Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic, subscribedTopics);
+
+        // Each consumer should end up in exactly one of the below
+        List<String> unfilledMembers = new LinkedList<>();
+        Queue<String> maxCapacityMembers = new LinkedList<>();
+        Queue<String> minCapacityMembers = new LinkedList<>();
+
+        int numberOfConsumers = consumerToOwnedPartitions.size();
+        int minQuota = (int) Math.floor(((double)unassignedPartitions.size()) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double)unassignedPartitions.size()) / numberOfConsumers);
+
+        // initialize the assignment map with an empty array of size minQuota for all members
+        Map<String, List<TopicPartition>> assignment = new HashMap<>(
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+
+        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
+            String consumer = consumerEntry.getKey();
+            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
+
+            if (ownedPartitions.size() < minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() == minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                minCapacityMembers.add(consumer);
+            } else {
+                List<TopicPartition> assignmentPartitions = assignment.get(consumer);
+                Iterator<TopicPartition> ownedPartitionsIter = ownedPartitions.iterator();
+                for (int i = 0; i < maxQuota; ++i) {
+                    TopicPartition tp = ownedPartitionsIter.next();
+                    assignmentPartitions.add(tp);
+                    unassignedPartitions.remove(tp);
+                }
+                maxCapacityMembers.add(consumer);
+            }
+        }
+
+        Collections.sort(unfilledMembers);
+        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
+
+        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
+            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+
+            while (unfilledConsumerIter.hasNext()) {
+                String consumer = unfilledConsumerIter.next();
+                List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+                if (unassignedPartitionsIter.hasNext()) {
+                    consumerAssignment.add(unassignedPartitionsIter.next());
+                    unassignedPartitionsIter.remove();
+                } else {
+                    break;
+                }
+
+                if (consumerAssignment.size() == minQuota) {
+                    minCapacityMembers.add(consumer);
+                    unfilledConsumerIter.remove();
+                }
+            }
+        }
+
+        // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
+        // from the over-full consumers at max capacity
+        for (String consumer : unfilledMembers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int remainingCapacity = minQuota - consumerAssignment.size();
+            while (remainingCapacity > 0) {

Review comment:
       NVM, I realized it should never happen.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -65,9 +69,182 @@ public MemberData(List<TopicPartition> partitions, Optional<Integer> generation)
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
+        Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
+        Set<String> subscribedTopics = new HashSet<>();
+        if (allSubscriptionsEqual(subscriptions, consumerToOwnedPartitions, subscribedTopics)) {
+            log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+                          + "optimized assignment algorithm");
+            return constrainedAssign(partitionsPerTopic, subscribedTopics, consumerToOwnedPartitions);
+        } else {
+            log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+                          + "general case assignment algorithm");
+            return generalAssign(partitionsPerTopic, subscriptions);
+        }
+    }
+
+    private boolean allSubscriptionsEqual(Map<String, Subscription> subscriptions,
+                                          Map<String, List<TopicPartition>> consumerToOwnedPartitions,
+                                          Set<String> subscribedTopics) {
+        Set<String> membersWithOldGeneration = new HashSet<>();
+        Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
+        int maxGeneration = -1;
+
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            String consumer = subscriptionEntry.getKey();
+            Subscription subscription = subscriptionEntry.getValue();
+
+            // initialize the subscribed topics set if this is the first subscription
+            if (subscribedTopics.isEmpty()) {
+                subscribedTopics.addAll(subscription.topics());
+            } else if (!(subscription.topics().size() == subscribedTopics.size()
+                && subscribedTopics.containsAll(subscription.topics()))) {
+                return false;
+            }
+
+            MemberData memberData = memberData(subscription);
+
+            // If this member's generation is lower than the current max, or it is not present while
+            // other generations are, consider it as having lost its owned partition
+            if (!memberData.generation.isPresent() && maxGeneration > 0
+                    || memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
+                consumerToOwnedPartitions.put(consumer, new ArrayList<>());
+            } else {
+                // If the current member's generation is higher, all the previous owned partitions are invalid
+                if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
+                    membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+                    membersOfCurrentHighestGeneration.clear();
+                    maxGeneration = memberData.generation.get();
+                }
+                membersOfCurrentHighestGeneration.add(consumer);
+                List<TopicPartition> ownedPartitions = memberData.partitions.stream()
+                    .filter(tp -> subscribedTopics.contains(tp.topic()))
+                    .collect(Collectors.toList());
+                consumerToOwnedPartitions.put(consumer, ownedPartitions);
+            }
+        }
+
+        for (String consumer : membersWithOldGeneration) {
+            consumerToOwnedPartitions.get(consumer).clear();
+        }
+        return true;
+    }
+
+    private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
+                                                                Set<String> subscribedTopics,
+                                                                Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
+        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic, subscribedTopics);
+
+        // Each consumer should end up in exactly one of the below
+        List<String> unfilledMembers = new LinkedList<>();
+        Queue<String> maxCapacityMembers = new LinkedList<>();
+        Queue<String> minCapacityMembers = new LinkedList<>();
+
+        int numberOfConsumers = consumerToOwnedPartitions.size();
+        int minQuota = (int) Math.floor(((double)unassignedPartitions.size()) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double)unassignedPartitions.size()) / numberOfConsumers);
+
+        // initialize the assignment map with an empty array of size minQuota for all members
+        Map<String, List<TopicPartition>> assignment = new HashMap<>(
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+
+        for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
+            String consumer = consumerEntry.getKey();
+            List<TopicPartition> ownedPartitions = consumerEntry.getValue();
+
+            if (ownedPartitions.size() < minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() == minQuota) {
+                assignment.get(consumer).addAll(ownedPartitions);
+                unassignedPartitions.removeAll(ownedPartitions);
+                minCapacityMembers.add(consumer);
+            } else {
+                List<TopicPartition> assignmentPartitions = assignment.get(consumer);
+                Iterator<TopicPartition> ownedPartitionsIter = ownedPartitions.iterator();
+                for (int i = 0; i < maxQuota; ++i) {
+                    TopicPartition tp = ownedPartitionsIter.next();
+                    assignmentPartitions.add(tp);
+                    unassignedPartitions.remove(tp);
+                }
+                maxCapacityMembers.add(consumer);
+            }
+        }
+
+        Collections.sort(unfilledMembers);
+        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
+
+        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
+            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+
+            while (unfilledConsumerIter.hasNext()) {
+                String consumer = unfilledConsumerIter.next();
+                List<TopicPartition> consumerAssignment = assignment.get(consumer);
+
+                if (unassignedPartitionsIter.hasNext()) {
+                    consumerAssignment.add(unassignedPartitionsIter.next());
+                    unassignedPartitionsIter.remove();
+                } else {
+                    break;
+                }
+
+                if (consumerAssignment.size() == minQuota) {
+                    minCapacityMembers.add(consumer);
+                    unfilledConsumerIter.remove();
+                }
+            }
+        }
+
+        // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
+        // from the over-full consumers at max capacity
+        for (String consumer : unfilledMembers) {
+            List<TopicPartition> consumerAssignment = assignment.get(consumer);
+            int remainingCapacity = minQuota - consumerAssignment.size();
+            while (remainingCapacity > 0) {

Review comment:
       Is it possible that this unfilled consumer has N+1 remaining capacity, while there's only N max consumer only?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org