You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/05/06 01:46:38 UTC
[kafka] branch trunk updated: KAFKA-12464: enhance constrained
sticky Assign algorithm (#10509)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7922550 KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)
7922550 is described below
commit 79225504ed920e63b2a31a968b7d50f88af5ada2
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu May 6 09:44:59 2021 +0800
KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)
1. Make code simpler and cleaner
2. After the PR: the testLargeAssignmentAndGroupWithUniformSubscription (1 million partitions) will run from ~2600 ms down to ~1400 ms, improves 46% of performance, almost 2x faster!!
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <gu...@confluent.io>
---
.../consumer/internals/AbstractStickyAssignor.java | 267 +++++++++++++--------
.../internals/AbstractStickyAssignorTest.java | 145 +++++++++++
2 files changed, 313 insertions(+), 99 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 7e42e44..24c8107 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -28,9 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Queue;
import java.util.Set;
-import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -82,6 +80,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
partitionsTransferringOwnership = null;
+ // we don't need consumerToOwnedPartitions in general assign case
+ consumerToOwnedPartitions = null;
return generalAssign(partitionsPerTopic, subscriptions);
}
}
@@ -149,141 +149,210 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
* This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
* The method includes the following steps:
*
- * 1. Reassign as many previously owned partitions as possible, up to the maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
- * from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
- * should just distribute one partition each to all consumers at min capacity
+ * 1. Reassign previously owned partitions:
+ * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list
+ * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions
+ * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if
+ * we're still under the number of expected max capacity members
+ * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions
*
* @param partitionsPerTopic The number of partitions for each subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
- SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
+ if (log.isDebugEnabled()) {
+ log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}",
+ partitionsPerTopic, consumerToOwnedPartitions);
+ }
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
- // Each consumer should end up in exactly one of the below
- // the consumers not yet at capacity
+ // the consumers not yet at expected capacity
List<String> unfilledMembers = new LinkedList<>();
- // the members with exactly maxQuota partitions assigned
- Queue<String> maxCapacityMembers = new LinkedList<>();
- // the members with exactly minQuota partitions assigned
- Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
- int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
- int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
+ int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+ int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
+ int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
+ // the expected number of members with maxQuota assignment
+ int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers;
+ // the number of members with exactly maxQuota partitions assigned
+ int numMembersHavingMorePartitions = 0;
- // initialize the assignment map with an empty array of size minQuota for all members
+ // initialize the assignment map with an empty array of size maxQuota for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
- consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+ consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int i = 0;
- // assign the first N partitions up to the max quota, and mark the remaining as being revoked
- for (TopicPartition tp : ownedPartitions) {
- if (i < maxQuota) {
- consumerAssignment.add(tp);
- unassignedPartitions.remove(tp);
- } else {
- allRevokedPartitions.add(tp);
- }
- ++i;
- }
if (ownedPartitions.size() < minQuota) {
+ // the expected assignment size is more than consumer have now, so keep all the owned partitions
+ // and put this member into unfilled member list
+ if (ownedPartitions.size() > 0) {
+ consumerAssignment.addAll(ownedPartitions);
+ assignedPartitions.addAll(ownedPartitions);
+ }
unfilledMembers.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+ // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
+ // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
+ numMembersHavingMorePartitions++;
+ List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
+ consumerAssignment.addAll(maxQuotaPartitions);
+ assignedPartitions.addAll(maxQuotaPartitions);
+ allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
} else {
- // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota
- if (consumerAssignment.size() == minQuota)
- minCapacityMembers.add(consumer);
- if (consumerAssignment.size() == maxQuota)
- maxCapacityMembers.add(consumer);
+ // consumer owned at least "minQuota" of partitions
+ // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
+ List<TopicPartition> minQuotaPartitions = ownedPartitions.subList(0, minQuota);
+ consumerAssignment.addAll(minQuotaPartitions);
+ assignedPartitions.addAll(minQuotaPartitions);
+ allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
+ // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
+ if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+ unfilledMembers.add(consumer);
+ }
}
}
+ List<TopicPartition> unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions);
+ assignedPartitions = null;
+
+ if (log.isDebugEnabled()) {
+ log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " +
+ "current assignment: {}", unfilledMembers, unassignedPartitions, assignment);
+ }
+
Collections.sort(unfilledMembers);
- Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
-
- // Fill remaining members up to minQuota
- while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
- Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
- while (unfilledConsumerIter.hasNext()) {
- String consumer = unfilledConsumerIter.next();
- List<TopicPartition> consumerAssignment = assignment.get(consumer);
-
- if (unassignedPartitionsIter.hasNext()) {
- TopicPartition tp = unassignedPartitionsIter.next();
- consumerAssignment.add(tp);
- unassignedPartitionsIter.remove();
- // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
- if (allRevokedPartitions.contains(tp))
- partitionsTransferringOwnership.put(tp, consumer);
- } else {
- break;
- }
- if (consumerAssignment.size() == minQuota) {
- minCapacityMembers.add(consumer);
- unfilledConsumerIter.remove();
+ Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+ // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
+ for (TopicPartition unassignedPartition : unassignedPartitions) {
+ if (!unfilledConsumerIter.hasNext()) {
+ if (unfilledMembers.isEmpty()) {
+ // Should not enter here since we have calculated the exact number to assign to each consumer
+ // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
+ throw new IllegalStateException("No more unfilled consumers to be assigned.");
}
+ unfilledConsumerIter = unfilledMembers.iterator();
}
- }
-
- // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
- // from the over-full consumers at max capacity
- for (String consumer : unfilledMembers) {
+ String consumer = unfilledConsumerIter.next();
List<TopicPartition> consumerAssignment = assignment.get(consumer);
- int remainingCapacity = minQuota - consumerAssignment.size();
- while (remainingCapacity > 0) {
- String overloadedConsumer = maxCapacityMembers.poll();
- if (overloadedConsumer == null) {
- throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned");
+ consumerAssignment.add(unassignedPartition);
+
+ // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer
+ if (allRevokedPartitions.contains(unassignedPartition))
+ partitionsTransferringOwnership.put(unassignedPartition, consumer);
+
+ int currentAssignedCount = consumerAssignment.size();
+ int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+ if (currentAssignedCount == expectedAssignedCount) {
+ if (currentAssignedCount == maxQuota) {
+ numMembersHavingMorePartitions++;
}
- TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
- consumerAssignment.add(swappedPartition);
- --remainingCapacity;
- // This partition is by definition transferring ownership, the swapped partition must have come from
- // the max capacity member's owned partitions since it can only reach max capacity with owned partitions
- partitionsTransferringOwnership.put(swappedPartition, consumer);
+ unfilledConsumerIter.remove();
}
- minCapacityMembers.add(consumer);
}
- // Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
- // should just distribute one partition each to all consumers at min capacity
- for (TopicPartition unassignedPartition : unassignedPartitions) {
- String underCapacityConsumer = minCapacityMembers.poll();
- if (underCapacityConsumer == null) {
- throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity");
+ if (!unfilledMembers.isEmpty()) {
+ // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
+ // of max capacity members. Otherwise, there must be error here.
+ if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
+ throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
+ "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
+ } else {
+ for (String unfilledMember : unfilledMembers) {
+ int assignedPartitionsCount = assignment.get(unfilledMember).size();
+ if (assignedPartitionsCount != minQuota) {
+ throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
+ "and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
+ }
+ }
}
- // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
- assignment.get(underCapacityConsumer).add(unassignedPartition);
+ }
- if (allRevokedPartitions.contains(unassignedPartition))
- partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
+ if (log.isDebugEnabled()) {
+ log.debug("Final assignment of partitions to consumers: \n{}", assignment);
}
return assignment;
}
- private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) {
- SortedSet<TopicPartition> allPartitions =
- new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
- for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
- String topic = entry.getKey();
- for (int i = 0; i < entry.getValue(); ++i) {
+ /**
+ * get the unassigned partition list by computing the difference set of all sorted partitions
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just return all topic partitions.
+ *
+ * To compute the difference set, we use two pointers technique here:
+ *
+ * We loop through the all sorted topics, and then iterate all partitions the topic has,
+ * compared with the ith element in sortedAssignedPartitions(i starts from 0):
+ * - if not equal to the ith element, add to unassignedPartitions
+ * - if equal to the the ith element, get next element from sortedAssignedPartitions
+ *
+ * @param totalPartitionsCount all partitions counts in this assignment
+ * @param partitionsPerTopic the number of partitions for each subscribed topic.
+ * @param sortedAssignedPartitions sorted partitions, all are included in the sortedPartitions
+ * @return the partitions not yet assigned to any consumers
+ */
+ private List<TopicPartition> getUnassignedPartitions(int totalPartitionsCount,
+ Map<String, Integer> partitionsPerTopic,
+ List<TopicPartition> sortedAssignedPartitions) {
+ List<String> sortedAllTopics = new ArrayList<>(partitionsPerTopic.keySet());
+ // sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0
+ Collections.sort(sortedAllTopics);
+
+ if (sortedAssignedPartitions.isEmpty()) {
+ // no assigned partitions means all partitions are unassigned partitions
+ return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);
+ }
+
+ List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+ Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+ boolean shouldAddDirectly = false;
+ Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
+ TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next();
+
+ for (String topic : sortedAllTopics) {
+ int partitionCount = partitionsPerTopic.get(topic);
+ for (int i = 0; i < partitionCount; i++) {
+ if (shouldAddDirectly || !(nextAssignedPartition.topic().equals(topic) && nextAssignedPartition.partition() == i)) {
+ unassignedPartitions.add(new TopicPartition(topic, i));
+ } else {
+ // this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition
+ if (sortedAssignedPartitionsIter.hasNext()) {
+ nextAssignedPartition = sortedAssignedPartitionsIter.next();
+ } else {
+ // add the remaining directly since there is no more sortedAssignedPartitions
+ shouldAddDirectly = true;
+ }
+ }
+ }
+ }
+
+ return unassignedPartitions;
+ }
+
+
+ private List<TopicPartition> getAllTopicPartitions(Map<String, Integer> partitionsPerTopic,
+ List<String> sortedAllTopics,
+ int totalPartitionsCount) {
+ List<TopicPartition> allPartitions = new ArrayList<>(totalPartitionsCount);
+
+ for (String topic : sortedAllTopics) {
+ int partitionCount = partitionsPerTopic.get(topic);
+ for (int i = 0; i < partitionCount; ++i) {
allPartitions.add(new TopicPartition(topic, i));
}
}
@@ -303,7 +372,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
* @param partitionsPerTopic The number of partitions for each subscribed topic.
* @param subscriptions Map from the member id to their respective topic subscription
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
@@ -446,10 +515,10 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
/**
* determine if the current assignment is a balanced one
*
- * @param currentAssignment: the assignment whose balance needs to be checked
- * @param sortedCurrentSubscriptions: an ascending sorted set of consumers based on how many topic partitions are already assigned to them
- * @param allSubscriptions: a mapping of all consumers to all potential topic partitions that can be assigned to them
- * @return true if the given assignment is balanced; false otherwise
+ * @param currentAssignment the assignment whose balance needs to be checked
+ * @param sortedCurrentSubscriptions an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+ * @param allSubscriptions a mapping of all consumers to all potential topic partitions that can be assigned to them
+ * @return true if the given assignment is balanced; false otherwise
*/
private boolean isBalanced(Map<String, List<TopicPartition>> currentAssignment,
TreeSet<String> sortedCurrentSubscriptions,
@@ -527,8 +596,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
* Sort valid partitions so they are processed in the potential reassignment phase in the proper order
* that causes minimal partition movement among consumers (hence honoring maximal stickiness)
*
- * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers
- * @return an ascending sorted list of topic partitions based on how many consumers can potentially use them
+ * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers
+ * @return an ascending sorted list of topic partitions based on how many consumers can potentially use them
*/
private List<TopicPartition> sortPartitions(Map<TopicPartition, List<String>> partition2AllPotentialConsumers) {
List<TopicPartition> sortedPartitions = new ArrayList<>(partition2AllPotentialConsumers.keySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 3578540..c8f6c14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -222,6 +222,101 @@ public abstract class AbstractStickyAssignorTest {
assertTrue(isFullyBalanced(assignment));
}
+ /**
+ * This unit test is testing consumer owned minQuota partitions, and expected to have maxQuota partitions situation
+ */
+ @Test
+ public void testConsumerOwningMinQuotaExpectedMaxQuota() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 3);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 1))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 2))));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 1), tp(topic2, 0)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ /**
+ * This unit test is testing consumers owned maxQuota partitions are more than numExpectedMaxCapacityMembers situation
+ */
+ @Test
+ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 2);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 0))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 1))));
+ subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList()));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
+
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ /**
+ * This unit test is testing all consumers owned less than minQuota partitions situation
+ */
+ @Test
+ public void testAllConsumerAreUnderMinQuota() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 2);
+ partitionsPerTopic.put(topic2, 3);
+
+ List<String> subscribedTopics = topics(topic1, topic2);
+
+ subscriptions.put(consumer1,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 0))));
+ subscriptions.put(consumer2,
+ buildSubscription(subscribedTopics, partitions(tp(topic1, 1))));
+ subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList()));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+
+ assertTrue(isFullyBalanced(assignment));
+ }
+
@Test
public void testAddRemoveConsumerOneTopic() {
String consumer1 = "consumer1";
@@ -256,6 +351,56 @@ public abstract class AbstractStickyAssignorTest {
assertTrue(isFullyBalanced(assignment));
}
+ @Test
+ public void testAddRemoveTwoConsumersTwoTopics() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String consumer1 = "consumer1";
+ String consumer2 = "consumer2";
+ String consumer3 = "consumer3";
+ String consumer4 = "consumer4";
+ List<String> allTopics = topics(topic1, topic2);
+
+ Map<String, Integer> partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic1, 3);
+ partitionsPerTopic.put(topic2, 4);
+ subscriptions.put(consumer1, new Subscription(allTopics));
+ subscriptions.put(consumer2, new Subscription(allTopics));
+
+ Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1), tp(topic2, 3)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+
+ // add 2 consumers
+ subscriptions.put(consumer1, buildSubscription(allTopics, assignment.get(consumer1)));
+ subscriptions.put(consumer2, buildSubscription(allTopics, assignment.get(consumer2)));
+ subscriptions.put(consumer3, buildSubscription(allTopics, Collections.emptyList()));
+ subscriptions.put(consumer4, buildSubscription(allTopics, Collections.emptyList()));
+ assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer4));
+ assertTrue(isFullyBalanced(assignment));
+
+ // remove 2 consumers
+ subscriptions.remove(consumer1);
+ subscriptions.remove(consumer2);
+ subscriptions.put(consumer3, buildSubscription(allTopics, assignment.get(consumer3)));
+ subscriptions.put(consumer4, buildSubscription(allTopics, assignment.get(consumer4)));
+ assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer4));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
/**
* This unit test performs sticky assignment for a scenario that round robin assignor handles poorly.
* Topics (partitions per topic): topic1 (2), topic2 (1), topic3 (2), topic4 (1), topic5 (2)