You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/14 12:17:36 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request, #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

rajinisivaram opened a new pull request, #12990:
URL: https://github.com/apache/kafka/pull/12990

   Best-effort rack alignment for range assignor when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring `client.rack` for consumers. Balanced assignment per topic is prioritized over rack-alignment. For topics with equal partitions and the same set of subscribers, co-partitioning is prioritized over rack-alignment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121387449


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(LinkedHashMap<String, Optional<String>> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers.keySet());
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
+
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;
+                }
             }
         }
-        return assignment;
+    }
+
+    private void assign(String consumer, List<TopicPartition> partitions, TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> assignment) {
+        assignment.get(consumer).addAll(partitions);
+        assignmentState.onAssigned(consumer, partitions);
+    }
+
+    private Map<String, String> consumerRacks(Map<String, Subscription> subscriptions) {
+        Map<String, String> consumerRacks = new HashMap<>(subscriptions.size());
+        subscriptions.forEach((memberId, subscription) ->
+                subscription.rackId().filter(r -> !r.isEmpty()).ifPresent(rackId -> consumerRacks.put(memberId, rackId)));
+        return consumerRacks;
+    }
+
+    private class TopicAssignmentState {
+        private final String topic;
+        private final LinkedHashMap<String, Optional<String>> consumers;
+        private final boolean needsRackAwareAssignment;
+        private final Map<TopicPartition, Set<String>> partitionRacks;
+
+        private final List<TopicPartition> unassignedPartitions;
+        private final Map<String, Integer> numAssignedByConsumer;
+        private final int numPartitionsPerConsumer;
+        private int remainingConsumersWithExtraPartition;
+
+        public TopicAssignmentState(String topic, List<PartitionInfo> partitionInfos, List<MemberInfo> membersOrNull, Map<String, String> consumerRacks) {
+            this.topic = topic;
+            List<MemberInfo> members = membersOrNull == null ? Collections.emptyList() : membersOrNull;
+            Collections.sort(members);
+            consumers = members.stream().map(c -> c.memberId)
+                    .collect(Collectors.toMap(Function.identity(), c -> Optional.ofNullable(consumerRacks.get(c)), (a, b) -> a, LinkedHashMap::new));
+
+            this.unassignedPartitions = partitionInfos.stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                    .collect(Collectors.toCollection(LinkedList::new));

Review Comment:
   nit: I was wondering if using a `LinkedHashSet` would be better here because we have to remove partition from it in `onAssigned`. Did you consider this?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());

Review Comment:
   nit: I was wondering if we should directly limit based on `assignmentState.maxAssignable(consumer)` at this stage. For large groups with a large number of partitions, we don't really have to generate the full list as we limit it anyway right after. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(LinkedHashMap<String, Optional<String>> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers.keySet());

Review Comment:
   nit: Should we also use a set here as we remove the selected consumer from it?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   We have `FetchFromFollowerIntegrationTest`. Not sure if this what you were looking for.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -63,9 +77,26 @@
  * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
  * <li><code>I1: [t0p2, t1p2]</code>
  * </ul>
+ * <p>
+ * Rack-aware assignment is used if both consumer and partition replica racks are available and
+ * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers guarantee co-partitioning by prioritizing
+ * co-partitioning over rack alignment. In this case, aligning partition replicas of these topics on the
+ * same racks will improve locality for consumers. For example, if partitions 0 of all topics have a replica
+ * on rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.
+ * <p>
+ * Note that rack-aware assignment currently takes all replicas into account, including any offline replicas
+ * and replicas that are not in the ISR. This is based on the assumption that these replicas are likely
+ * to join the ISR relatively soon. Since consumers don't rebalance on ISR change, this avoids unnecessary
+ * cross-rack traffic for long durations after replicas rejoin the ISR. In the future, we may consider
+ * rebalancing when replicas are added or removed to improve consumer rack alignment.
+ * </p>
  */
 public class RangeAssignor extends AbstractPartitionAssignor {
     public static final String RANGE_ASSIGNOR_NAME = "range";
+    private final static TopicPartitionComparator PARTITION_COMPARATOR = new TopicPartitionComparator();

Review Comment:
   nit: `final static` -> `static final` to be consistent with L98?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115773500


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);

Review Comment:
   @dajac Thanks for the review. This `sort` is still done, I just moved it to TopicAssignmentState where the members are processed. All the tests in RangeAssignorTest are also run in rack-aware mode where all partitions have all racks to verify that the new algorithm gives the same result as the old algorithm. RangeAssignorTest.testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges tests the scenario you described. I have added another version of the same test with a lower replication factor to verify the case where racks have a subset of partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112014546


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;

Review Comment:
   Needed `p` since the lamdas required a final variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115363600


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   Yeah, I agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121733442


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   Ah, I missed that one, I think it is one of the new tests that was added when fixing FFF issues. I moved this test to that class. It uses 2 brokers instead of 3, but that seems ok for this test. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112016639


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;

Review Comment:
   As before, we fall-back to non-rack-aware assignment for all unassigned partitions after attempting rack-aware assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115362518


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);

Review Comment:
   @rajinisivaram I am trying to understand how we preserve the member order in the new algorithm. I am actually not sure that we preserve this property. It is important for when static member id are defined. Could we add a unit test which verifies that the assignment does not change when a member rejoins with a different member id but with the same static member id?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011199


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -63,9 +76,19 @@
  * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
  * <li><code>I1: [t0p2, t1p2]</code>
  * </ul>
+ * <p>
+ * Rack-aware assignment is used if both consumer and partition replica racks are available and
+ * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee
+ * over rack alignment. In this case, aligning partition replicas of these topics on the same racks
+ * will improve locality for consumers. For example, if partitions 0 of all topics have a replica on
+ * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.

Review Comment:
   Yes, that is correct. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112015848


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {

Review Comment:
   We fall-back to non-rack-aware assignment for all unassigned partitions, using the same logic as today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112029042


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   I wanted to put this test into a class that had FFF enabled, but I couldn't find any integration tests for FFF. So included here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   I thought about these type of edge cases and felt that the logic is already much more complicated than it used to be. Handling edge cases would make it even more complex in terms of both implementation and testing. For example, in this particular scenario, we have n topics with p partitions and the n*p partitions could have replicas on different number of racks, but we want to co-partition. The current implementation works in a typical case where partition and consumer racks are uniform. We could extend further in follow-on PRs if we find that the edge cases are likely scenarios. What do you think?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java:
##########
@@ -302,10 +339,151 @@ public void testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges() {
         assertEquals(staticAssignment, newStaticAssignment);
     }
 
-    static Map<String, List<TopicPartition>> checkStaticAssignment(AbstractPartitionAssignor assignor,
-                                                                   Map<String, Integer> partitionsPerTopic,
-                                                                   Map<String, Subscription> consumers) {
-        Map<String, List<TopicPartition>> assignmentByMemberId = assignor.assign(partitionsPerTopic, consumers);
+    @Test
+    public void testRackAwareAssignmentWithUniformSubscription() {
+        Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2", 7), mkEntry("t3", 2));
+        List<String> allTopics = asList("t1", "t2", "t3");
+        List<List<String>> consumerTopics = asList(allTopics, allTopics, allTopics);
+        List<String> nonRackAwareAssignment = asList(
+                "t1-0, t1-1, t2-0, t2-1, t2-2, t3-0",
+                "t1-2, t1-3, t2-3, t2-4, t3-1",
+                "t1-4, t1-5, t2-5, t2-6"
+        );
+
+        // Verify combinations where rack-aware logic is not used.
+        verifyNonRackAwareAssignment(topics, consumerTopics, nonRackAwareAssignment);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());

Review Comment:
   Good idea, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450838774

   @dajac Thanks for the reviews. Test failures not related (have verified them locally), merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1109783046


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -63,9 +76,19 @@
  * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
  * <li><code>I1: [t0p2, t1p2]</code>
  * </ul>
+ * <p>
+ * Rack-aware assignment is used if both consumer and partition replica racks are available and
+ * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee
+ * over rack alignment. In this case, aligning partition replicas of these topics on the same racks
+ * will improve locality for consumers. For example, if partitions 0 of all topics have a replica on
+ * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.

Review Comment:
   > Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee over rack alignment.
   
   I would like to ensure that we are on the same point on this point. My understanding is that the current implementation guarantees the co-partitioning iff the topics have the same number of partitions. Am I correct?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {

Review Comment:
   nit: Would it make sense to use `forEach` here? I always find the `getKey` and `getValue` annoying.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {

Review Comment:
   What do we do with the partition if we don't find a consumer?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;
+                }
             }
         }
-        return assignment;
+    }
+
+    private void assign(String consumer, List<TopicPartition> partitions, TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> assignment) {
+        assignment.get(consumer).addAll(partitions);
+        assignmentState.onAssigned(consumer, partitions);
+    }
+
+    private class TopicAssignmentState {
+        private final String topic;
+        private final List<String> consumers;
+        private final boolean needsRackAwareAssignment;
+        private final Map<TopicPartition, Set<String>> partitionRacks;
+        private final Map<String, String> consumerRacks;

Review Comment:
   It feels a bit weird to have this mapping here because it will be same in all `TopicAssignmentState` if the subscriptions are the same for all consumers. Is there a reason why you decided to have it here?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   I was wondering if there is any value in having an integration test with FFF enabled as well. I am really not sure. What do you think?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {
+    val partitionList = servers.indices.toList
+
+    val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks"
+    createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size)
+
+    // Racks are in order of broker ids, assign leaders in reverse order
+    val topicWithSingleRackPartitions = "topicWithSingleRackPartitions"
+    createTopicWithAssignment(topicWithSingleRackPartitions, partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap)
+
+    // Create consumers with instance ids in ascending order, with racks in the same order.
+    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RangeAssignor].getName)
+    val consumers = servers.map { server =>
+      consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, server.config.rack.orNull)
+      consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, s"instance-${server.config.brokerId}")
+      createConsumer()
+    }
+
+    val executor = Executors.newFixedThreadPool(consumers.size)
+    def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+      val futures = consumers.zipWithIndex.map { case (consumer, i) =>
+        executor.submit(() => awaitAssignment(consumer, assignments(i)), 0)
+      }
+      futures.foreach(future => assertEquals(0, future.get(20, TimeUnit.SECONDS)))
+    }
 
-class RackAwareAssignor extends RoundRobinAssignor {
-  override def assign(partitionsPerTopic: util.Map[String, Integer], subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): util.Map[String, util.List[TopicPartition]] = {
-    assertEquals(1, subscriptions.size())
-    assertEquals(Optional.of("rack-a"), subscriptions.values.asScala.head.rackId)
-    super.assign(partitionsPerTopic, subscriptions)
+    try {
+      // Rack-based assignment results in partitions assigned in reverse order since partition racks are in the reverse order.
+      consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
+      waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithSingleRackPartitions, p))))
+
+      // Non-rack-aware assignment results in ordered partitions.
+      consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
+      waitForAssignments(partitionList.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p))))
+
+      // Rack-aware assignment with co-partitioning results in reverse assignment for both topics.
+      consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks).asJava))
+      waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p), new TopicPartition(topicWithSingleRackPartitions, p))))

Review Comment:
   When we have a consumer in the same zone as a leader, does the assignment algorithm guarantees that the leader in the same zone is selected all the time?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));

Review Comment:
   At first, it is not so clear why both `assignWithRackMatching` and `assignRanges` must be called. It may be worth adding a comment to explain the assignment logic here or some javadoc for `assignPartitions`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }

Review Comment:
   This makes sense. Unfortunately, we did not make the class final.



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
-import kafka.server.QuotaType
-import kafka.server.KafkaServer
-import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import scala.collection.mutable
+import scala.collection.mutable.Buffer
+import scala.jdk.CollectionConverters._
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
 
+  override def modifyConfigs(props: collection.Seq[Properties]): Unit = {
+    super.modifyConfigs(props)
+    props.zipWithIndex.foreach{ case (p, i) => p.setProperty(KafkaConfig.RackProp, i.toString) }

Review Comment:
   nit: I think that we usually put a space before `{`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   I wonder if going over the partition is the best way. Imagine a case where the last partition ends up having only one rack available. This must be rare, I agree. However, you could end up in a situation where that partition could not be assigned to a consumer with the correct rack because all the consumers with that rack may have reached their maximum capacity.
   
   When I was thinking about it, I was wondering if it would be better to start with the most constraint partitions: the partitions with only one rack available, then the partitions with two racks, etc. Did you consider something like this? There is perhaps a downside that I haven't though about. Once concern is that it may be less deterministic and the determinism is important.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;

Review Comment:
   nit: I suppose that we could rename `i` to `p` and remove this one, isn't it?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java:
##########
@@ -302,10 +339,151 @@ public void testStaticMemberRangeAssignmentPersistentAfterMemberIdChanges() {
         assertEquals(staticAssignment, newStaticAssignment);
     }
 
-    static Map<String, List<TopicPartition>> checkStaticAssignment(AbstractPartitionAssignor assignor,
-                                                                   Map<String, Integer> partitionsPerTopic,
-                                                                   Map<String, Subscription> consumers) {
-        Map<String, List<TopicPartition>> assignmentByMemberId = assignor.assign(partitionsPerTopic, consumers);
+    @Test
+    public void testRackAwareAssignmentWithUniformSubscription() {
+        Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2", 7), mkEntry("t3", 2));
+        List<String> allTopics = asList("t1", "t2", "t3");
+        List<List<String>> consumerTopics = asList(allTopics, allTopics, allTopics);
+        List<String> nonRackAwareAssignment = asList(
+                "t1-0, t1-1, t2-0, t2-1, t2-2, t3-0",
+                "t1-2, t1-3, t2-3, t2-4, t3-1",
+                "t1-4, t1-5, t2-5, t2-6"
+        );
+
+        // Verify combinations where rack-aware logic is not used.
+        verifyNonRackAwareAssignment(topics, consumerTopics, nonRackAwareAssignment);

Review Comment:
   nit: Should we inline `nonRackAwareAssignment` like for the other ones?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;

Review Comment:
   What do we do with the partition if we end up here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram merged pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram merged PR #12990:
URL: https://github.com/apache/kafka/pull/12990


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450159413

   @dajac Thanks for the reviews, I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112026015


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
-import kafka.server.QuotaType
-import kafka.server.KafkaServer
-import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import scala.collection.mutable
+import scala.collection.mutable.Buffer
+import scala.jdk.CollectionConverters._
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
 
+  override def modifyConfigs(props: collection.Seq[Properties]): Unit = {
+    super.modifyConfigs(props)
+    props.zipWithIndex.foreach{ case (p, i) => p.setProperty(KafkaConfig.RackProp, i.toString) }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011627


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));

Review Comment:
   Added javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1115822582


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);

Review Comment:
   Thanks. I missed it in the `TopicAssignmentState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112027236


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-    consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName)
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {
+    val partitionList = servers.indices.toList
+
+    val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks"
+    createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size)
+
+    // Racks are in order of broker ids, assign leaders in reverse order
+    val topicWithSingleRackPartitions = "topicWithSingleRackPartitions"
+    createTopicWithAssignment(topicWithSingleRackPartitions, partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap)
+
+    // Create consumers with instance ids in ascending order, with racks in the same order.
+    consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RangeAssignor].getName)
+    val consumers = servers.map { server =>
+      consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, server.config.rack.orNull)
+      consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, s"instance-${server.config.brokerId}")
+      createConsumer()
+    }
+
+    val executor = Executors.newFixedThreadPool(consumers.size)
+    def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+      val futures = consumers.zipWithIndex.map { case (consumer, i) =>
+        executor.submit(() => awaitAssignment(consumer, assignments(i)), 0)
+      }
+      futures.foreach(future => assertEquals(0, future.get(20, TimeUnit.SECONDS)))
+    }
 
-class RackAwareAssignor extends RoundRobinAssignor {
-  override def assign(partitionsPerTopic: util.Map[String, Integer], subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): util.Map[String, util.List[TopicPartition]] = {
-    assertEquals(1, subscriptions.size())
-    assertEquals(Optional.of("rack-a"), subscriptions.values.asScala.head.rackId)
-    super.assign(partitionsPerTopic, subscriptions)
+    try {
+      // Rack-based assignment results in partitions assigned in reverse order since partition racks are in the reverse order.
+      consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
+      waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithSingleRackPartitions, p))))
+
+      // Non-rack-aware assignment results in ordered partitions.
+      consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
+      waitForAssignments(partitionList.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p))))
+
+      // Rack-aware assignment with co-partitioning results in reverse assignment for both topics.
+      consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks).asJava))
+      waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p), new TopicPartition(topicWithSingleRackPartitions, p))))

Review Comment:
   No, we don't do any leader-specific assignment, we are assuming all replicas are equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1437118603

   @dajac Thanks for the review. I have addressed the comments and left some questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -76,43 +99,185 @@ private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
             String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
+            Subscription subscription = subscriptionEntry.getValue();
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            for (String topic : subscription.topics()) {
                 put(topicToConsumers, topic, memberInfo);
             }
         }
         return topicToConsumers;
     }
 
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey())))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
             assignment.put(memberId, new ArrayList<>());
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
+
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
+
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(List<String> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers);
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;
+                }
             }
         }
-        return assignment;
+    }
+
+    private void assign(String consumer, List<TopicPartition> partitions, TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> assignment) {
+        assignment.get(consumer).addAll(partitions);
+        assignmentState.onAssigned(consumer, partitions);
+    }
+
+    private class TopicAssignmentState {
+        private final String topic;
+        private final List<String> consumers;
+        private final boolean needsRackAwareAssignment;
+        private final Map<TopicPartition, Set<String>> partitionRacks;
+        private final Map<String, String> consumerRacks;

Review Comment:
   It was useful to have the consumer racks along with the partition racks for each topic for matching racks. But we don't need to populate it for each topic. I have removed this map and updated the list of consumers of the topic to include the rack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121728688


##########
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##########
@@ -74,45 +105,194 @@ public String name() {
 
     private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
         Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
-        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
-            String consumerId = subscriptionEntry.getKey();
-            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
-            for (String topic : subscriptionEntry.getValue().topics()) {
-                put(topicToConsumers, topic, memberInfo);
-            }
-        }
+        consumerMetadata.forEach((consumerId, subscription) -> {
+            MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId());
+            subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo));
+        });
         return topicToConsumers;
     }
 
+    /**
+     * Performs range assignment of the specified partitions for the consumers with the provided subscriptions.
+     * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign
+     * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and
+     * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned
+     * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks.
+     */
     @Override
-    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    public Map<String, List<TopicPartition>> assignPartitions(Map<String, List<PartitionInfo>> partitionsPerTopic,
+                                                              Map<String, Subscription> subscriptions) {
         Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);
+        Map<String, String> consumerRacks = consumerRacks(subscriptions);
+        List<TopicAssignmentState> topicAssignmentStates = partitionsPerTopic.entrySet().stream()
+                .filter(e -> !e.getValue().isEmpty())
+                .map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks))
+                .collect(Collectors.toList());
 
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>()));
+
+        boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment);
+        if (useRackAware)
+            assignWithRackMatching(topicAssignmentStates, assignment);
+
+        topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment));
+
+        if (useRackAware)
+            assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR));
+        return assignment;
+    }
+
+    // This method is not used, but retained for compatibility with any custom assignors that extend this class.
+    @Override
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, Subscription> subscriptions) {
+        return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+    }
 
-        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            List<MemberInfo> consumersForTopic = topicEntry.getValue();
+    private void assignRanges(TopicAssignmentState assignmentState,
+                              BiFunction<String, TopicPartition, Boolean> mayAssign,
+                              Map<String, List<TopicPartition>> assignment) {
+        for (String consumer : assignmentState.consumers.keySet()) {
+            if (assignmentState.unassignedPartitions.isEmpty())
+                break;
+            List<TopicPartition> assignablePartitions = assignmentState.unassignedPartitions.stream()
+                    .filter(tp -> mayAssign.apply(consumer, tp))
+                    .collect(Collectors.toList());
 
-            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-            if (numPartitionsForTopic == null)
+            int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+            if (maxAssignable <= 0)
                 continue;
 
-            Collections.sort(consumersForTopic);
+            assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment);
+        }
+    }
 
-            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
-            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
+    private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
+                                        Map<String, List<TopicPartition>> assignment) {
 
-            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
-            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
-                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
-                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
-                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
+        assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
+            states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+                if (coPartitionedStates.size() > 1)
+                    assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment);
+                else {
+                    TopicAssignmentState state = coPartitionedStates.get(0);
+                    if (state.needsRackAwareAssignment)
+                        assignRanges(state, state::racksMatch, assignment);
+                }
+            });
+        });
+    }
+
+    private void assignCoPartitionedWithRackMatching(LinkedHashMap<String, Optional<String>> consumers,
+                                                     int numPartitions,
+                                                     Collection<TopicAssignmentState> assignmentStates,
+                                                     Map<String, List<TopicPartition>> assignment) {
+
+        List<String> remainingConsumers = new LinkedList<>(consumers.keySet());
+        for (int i = 0; i < numPartitions; i++) {
+            int p = i;
+
+            Optional<String> matchingConsumer = remainingConsumers.stream()
+                    .filter(c -> assignmentStates.stream().allMatch(t -> t.racksMatch(c, new TopicPartition(t.topic, p)) && t.maxAssignable(c) > 0))
+                    .findFirst();
+            if (matchingConsumer.isPresent()) {
+                String consumer = matchingConsumer.get();
+                assignmentStates.forEach(t -> assign(consumer, Collections.singletonList(new TopicPartition(t.topic, p)), t, assignment));
+
+                if (assignmentStates.stream().noneMatch(t -> t.maxAssignable(consumer) > 0)) {
+                    remainingConsumers.remove(consumer);
+                    if (remainingConsumers.isEmpty())
+                        break;
+                }
             }
         }
-        return assignment;
+    }
+
+    private void assign(String consumer, List<TopicPartition> partitions, TopicAssignmentState assignmentState, Map<String, List<TopicPartition>> assignment) {
+        assignment.get(consumer).addAll(partitions);
+        assignmentState.onAssigned(consumer, partitions);
+    }
+
+    private Map<String, String> consumerRacks(Map<String, Subscription> subscriptions) {
+        Map<String, String> consumerRacks = new HashMap<>(subscriptions.size());
+        subscriptions.forEach((memberId, subscription) ->
+                subscription.rackId().filter(r -> !r.isEmpty()).ifPresent(rackId -> consumerRacks.put(memberId, rackId)));
+        return consumerRacks;
+    }
+
+    private class TopicAssignmentState {
+        private final String topic;
+        private final LinkedHashMap<String, Optional<String>> consumers;
+        private final boolean needsRackAwareAssignment;
+        private final Map<TopicPartition, Set<String>> partitionRacks;
+
+        private final List<TopicPartition> unassignedPartitions;
+        private final Map<String, Integer> numAssignedByConsumer;
+        private final int numPartitionsPerConsumer;
+        private int remainingConsumersWithExtraPartition;
+
+        public TopicAssignmentState(String topic, List<PartitionInfo> partitionInfos, List<MemberInfo> membersOrNull, Map<String, String> consumerRacks) {
+            this.topic = topic;
+            List<MemberInfo> members = membersOrNull == null ? Collections.emptyList() : membersOrNull;
+            Collections.sort(members);
+            consumers = members.stream().map(c -> c.memberId)
+                    .collect(Collectors.toMap(Function.identity(), c -> Optional.ofNullable(consumerRacks.get(c)), (a, b) -> a, LinkedHashMap::new));
+
+            this.unassignedPartitions = partitionInfos.stream().map(p -> new TopicPartition(p.topic(), p.partition()))
+                    .collect(Collectors.toCollection(LinkedList::new));

Review Comment:
   Makes sense, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] twmb commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

Posted by "twmb (via GitHub)" <gi...@apache.org>.
twmb commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1452523214

   :wave: hi, seeing this after it's merged! Is the intent to create a new PR for making the sticky assignor rack aware? I see in the original PR #12914 that both range and sticky were improved. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org