You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/05/06 01:46:38 UTC

[kafka] branch trunk updated: KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7922550  KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)
7922550 is described below

commit 79225504ed920e63b2a31a968b7d50f88af5ada2
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu May 6 09:44:59 2021 +0800

    KAFKA-12464: enhance constrained sticky Assign algorithm (#10509)
    
    1. Make code simpler and cleaner
    2. After the PR: the testLargeAssignmentAndGroupWithUniformSubscription (1 million partitions) will run from ~2600 ms down to ~1400 ms, improves 46% of performance, almost 2x faster!!
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <gu...@confluent.io>
---
 .../consumer/internals/AbstractStickyAssignor.java | 267 +++++++++++++--------
 .../internals/AbstractStickyAssignorTest.java      | 145 +++++++++++
 2 files changed, 313 insertions(+), 99 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 7e42e44..24c8107 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -28,9 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -82,6 +80,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
             log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
                           + "general case assignment algorithm");
             partitionsTransferringOwnership = null;
+            // we don't need consumerToOwnedPartitions in general assign case
+            consumerToOwnedPartitions = null;
             return generalAssign(partitionsPerTopic, subscriptions);
         }
     }
@@ -149,141 +149,210 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
      * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
      * The method includes the following steps:
      *
-     * 1. Reassign as many previously owned partitions as possible, up to the maxQuota
-     * 2. Fill remaining members up to minQuota
-     * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
-     *    from the over-full consumers at max capacity
-     * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
-     *    should just distribute one partition each to all consumers at min capacity
+     * 1. Reassign previously owned partitions:
+     *   a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list
+     *   b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions
+     *   c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if
+     *     we're still under the number of expected max capacity members
+     * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions
      *
      * @param partitionsPerTopic          The number of partitions for each subscribed topic
      * @param consumerToOwnedPartitions   Each consumer's previously owned and still-subscribed partitions
      *
-     * @return Map from each member to the list of partitions assigned to them.
+     * @return                            Map from each member to the list of partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
                                                                 Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
-        SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
+        if (log.isDebugEnabled()) {
+            log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}",
+                partitionsPerTopic, consumerToOwnedPartitions);
+        }
 
         Set<TopicPartition> allRevokedPartitions = new HashSet<>();
 
-        // Each consumer should end up in exactly one of the below
-        // the consumers not yet at capacity
+        // the consumers not yet at expected capacity
         List<String> unfilledMembers = new LinkedList<>();
-        // the members with exactly maxQuota partitions assigned
-        Queue<String> maxCapacityMembers = new LinkedList<>();
-        // the members with exactly minQuota partitions assigned
-        Queue<String> minCapacityMembers = new LinkedList<>();
 
         int numberOfConsumers = consumerToOwnedPartitions.size();
-        int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
-        int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
+        int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+        int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
+        int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
+        // the expected number of members with maxQuota assignment
+        int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers;
+        // the number of members with exactly maxQuota partitions assigned
+        int numMembersHavingMorePartitions = 0;
 
-        // initialize the assignment map with an empty array of size minQuota for all members
+        // initialize the assignment map with an empty array of size maxQuota for all members
         Map<String, List<TopicPartition>> assignment = new HashMap<>(
-            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
+            consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
 
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
         // Reassign as many previously owned partitions as possible
         for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
             String consumer = consumerEntry.getKey();
             List<TopicPartition> ownedPartitions = consumerEntry.getValue();
 
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int i = 0;
-            // assign the first N partitions up to the max quota, and mark the remaining as being revoked
-            for (TopicPartition tp : ownedPartitions) {
-                if (i < maxQuota) {
-                    consumerAssignment.add(tp);
-                    unassignedPartitions.remove(tp);
-                } else {
-                    allRevokedPartitions.add(tp);
-                }
-                ++i;
-            }
 
             if (ownedPartitions.size() < minQuota) {
+                // the expected assignment size is more than consumer have now, so keep all the owned partitions
+                // and put this member into unfilled member list
+                if (ownedPartitions.size() > 0) {
+                    consumerAssignment.addAll(ownedPartitions);
+                    assignedPartitions.addAll(ownedPartitions);
+                }
                 unfilledMembers.add(consumer);
+            } else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+                // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
+                // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
+                numMembersHavingMorePartitions++;
+                List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
+                consumerAssignment.addAll(maxQuotaPartitions);
+                assignedPartitions.addAll(maxQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
             } else {
-                // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota
-                if (consumerAssignment.size() == minQuota)
-                    minCapacityMembers.add(consumer);
-                if (consumerAssignment.size() == maxQuota)
-                    maxCapacityMembers.add(consumer);
+                // consumer owned at least "minQuota" of partitions
+                // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
+                List<TopicPartition> minQuotaPartitions = ownedPartitions.subList(0, minQuota);
+                consumerAssignment.addAll(minQuotaPartitions);
+                assignedPartitions.addAll(minQuotaPartitions);
+                allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
+                // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
+                if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+                    unfilledMembers.add(consumer);
+                }
             }
         }
 
+        List<TopicPartition> unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions);
+        assignedPartitions = null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " +
+                "current assignment: {}", unfilledMembers, unassignedPartitions, assignment);
+        }
+
         Collections.sort(unfilledMembers);
-        Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
-
-        // Fill remaining members up to minQuota
-        while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
-            Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
-
-            while (unfilledConsumerIter.hasNext()) {
-                String consumer = unfilledConsumerIter.next();
-                List<TopicPartition> consumerAssignment = assignment.get(consumer);
-
-                if (unassignedPartitionsIter.hasNext()) {
-                    TopicPartition tp = unassignedPartitionsIter.next();
-                    consumerAssignment.add(tp);
-                    unassignedPartitionsIter.remove();
-                    // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
-                    if (allRevokedPartitions.contains(tp))
-                        partitionsTransferringOwnership.put(tp, consumer);
-                } else {
-                    break;
-                }
 
-                if (consumerAssignment.size() == minQuota) {
-                    minCapacityMembers.add(consumer);
-                    unfilledConsumerIter.remove();
+        Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
+        // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
+        for (TopicPartition unassignedPartition : unassignedPartitions) {
+            if (!unfilledConsumerIter.hasNext()) {
+                if (unfilledMembers.isEmpty()) {
+                    // Should not enter here since we have calculated the exact number to assign to each consumer
+                    // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
+                    throw new IllegalStateException("No more unfilled consumers to be assigned.");
                 }
+                unfilledConsumerIter = unfilledMembers.iterator();
             }
-        }
-
-        // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
-        // from the over-full consumers at max capacity
-        for (String consumer : unfilledMembers) {
+            String consumer = unfilledConsumerIter.next();
             List<TopicPartition> consumerAssignment = assignment.get(consumer);
-            int remainingCapacity = minQuota - consumerAssignment.size();
-            while (remainingCapacity > 0) {
-                String overloadedConsumer = maxCapacityMembers.poll();
-                if (overloadedConsumer == null) {
-                    throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned");
+            consumerAssignment.add(unassignedPartition);
+
+            // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer
+            if (allRevokedPartitions.contains(unassignedPartition))
+                partitionsTransferringOwnership.put(unassignedPartition, consumer);
+
+            int currentAssignedCount = consumerAssignment.size();
+            int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+            if (currentAssignedCount == expectedAssignedCount) {
+                if (currentAssignedCount == maxQuota) {
+                    numMembersHavingMorePartitions++;
                 }
-                TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
-                consumerAssignment.add(swappedPartition);
-                --remainingCapacity;
-                // This partition is by definition transferring ownership, the swapped partition must have come from
-                // the max capacity member's owned partitions since it can only reach max capacity with owned partitions
-                partitionsTransferringOwnership.put(swappedPartition, consumer);
+                unfilledConsumerIter.remove();
             }
-            minCapacityMembers.add(consumer);
         }
 
-        // Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
-        // should just distribute one partition each to all consumers at min capacity
-        for (TopicPartition unassignedPartition : unassignedPartitions) {
-            String underCapacityConsumer = minCapacityMembers.poll();
-            if (underCapacityConsumer == null) {
-                throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity");
+        if (!unfilledMembers.isEmpty()) {
+            // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
+            // of max capacity members. Otherwise, there must be error here.
+            if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
+                throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
+                    "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
+            } else {
+                for (String unfilledMember : unfilledMembers) {
+                    int assignedPartitionsCount = assignment.get(unfilledMember).size();
+                    if (assignedPartitionsCount != minQuota) {
+                        throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
+                            "and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
+                    }
+                }
             }
-            // We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
-            assignment.get(underCapacityConsumer).add(unassignedPartition);
+        }
 
-            if (allRevokedPartitions.contains(unassignedPartition))
-                partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
+        if (log.isDebugEnabled()) {
+            log.debug("Final assignment of partitions to consumers: \n{}", assignment);
         }
 
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of all sorted partitions
+     * and sortedAssignedPartitions. If no assigned partitions, we'll just return all topic partitions.
+     *
+     * To compute the difference set, we use two pointers technique here:
+     *
+     * We loop through the all sorted topics, and then iterate all partitions the topic has,
+     * compared with the ith element in sortedAssignedPartitions(i starts from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from sortedAssignedPartitions
+     *
+     * @param totalPartitionsCount      all partitions counts in this assignment
+     * @param partitionsPerTopic        the number of partitions for each subscribed topic.
+     * @param sortedAssignedPartitions  sorted partitions, all are included in the sortedPartitions
+     * @return                          the partitions not yet assigned to any consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(int totalPartitionsCount,
+                                                         Map<String, Integer> partitionsPerTopic,
+                                                         List<TopicPartition> sortedAssignedPartitions) {
+        List<String> sortedAllTopics = new ArrayList<>(partitionsPerTopic.keySet());
+        // sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0
+        Collections.sort(sortedAllTopics);
+
+        if (sortedAssignedPartitions.isEmpty()) {
+            // no assigned partitions means all partitions are unassigned partitions
+            return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);
+        }
+
+        List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
+
+        Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
+
+        boolean shouldAddDirectly = false;
+        Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
+        TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next();
+
+        for (String topic : sortedAllTopics) {
+            int partitionCount = partitionsPerTopic.get(topic);
+            for (int i = 0; i < partitionCount; i++) {
+                if (shouldAddDirectly || !(nextAssignedPartition.topic().equals(topic) && nextAssignedPartition.partition() == i)) {
+                    unassignedPartitions.add(new TopicPartition(topic, i));
+                } else {
+                    // this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition
+                    if (sortedAssignedPartitionsIter.hasNext()) {
+                        nextAssignedPartition = sortedAssignedPartitionsIter.next();
+                    } else {
+                        // add the remaining directly since there is no more sortedAssignedPartitions
+                        shouldAddDirectly = true;
+                    }
+                }
+            }
+        }
+
+        return unassignedPartitions;
+    }
+
+
+    private List<TopicPartition> getAllTopicPartitions(Map<String, Integer> partitionsPerTopic,
+                                                       List<String> sortedAllTopics,
+                                                       int totalPartitionsCount) {
+        List<TopicPartition> allPartitions = new ArrayList<>(totalPartitionsCount);
+
+        for (String topic : sortedAllTopics) {
+            int partitionCount = partitionsPerTopic.get(topic);
+            for (int i = 0; i < partitionCount; ++i) {
                 allPartitions.add(new TopicPartition(topic, i));
             }
         }
@@ -303,7 +372,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
      * @param partitionsPerTopic         The number of partitions for each subscribed topic.
      * @param subscriptions              Map from the member id to their respective topic subscription
      *
-     * @return Map from each member to the list of partitions assigned to them.
+     * @return                           Map from each member to the list of partitions assigned to them.
      */
     private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic,
                                                             Map<String, Subscription> subscriptions) {
@@ -446,10 +515,10 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
     /**
      * determine if the current assignment is a balanced one
      *
-     * @param currentAssignment: the assignment whose balance needs to be checked
-     * @param sortedCurrentSubscriptions: an ascending sorted set of consumers based on how many topic partitions are already assigned to them
-     * @param allSubscriptions: a mapping of all consumers to all potential topic partitions that can be assigned to them
-     * @return true if the given assignment is balanced; false otherwise
+     * @param currentAssignment             the assignment whose balance needs to be checked
+     * @param sortedCurrentSubscriptions    an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+     * @param allSubscriptions              a mapping of all consumers to all potential topic partitions that can be assigned to them
+     * @return                              true if the given assignment is balanced; false otherwise
      */
     private boolean isBalanced(Map<String, List<TopicPartition>> currentAssignment,
                                TreeSet<String> sortedCurrentSubscriptions,
@@ -527,8 +596,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
      * Sort valid partitions so they are processed in the potential reassignment phase in the proper order
      * that causes minimal partition movement among consumers (hence honoring maximal stickiness)
      *
-     * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers
-     * @return  an ascending sorted list of topic partitions based on how many consumers can potentially use them
+     * @param partition2AllPotentialConsumers   a mapping of partitions to their potential consumers
+     * @return                                  an ascending sorted list of topic partitions based on how many consumers can potentially use them
      */
     private List<TopicPartition> sortPartitions(Map<TopicPartition, List<String>> partition2AllPotentialConsumers) {
         List<TopicPartition> sortedPartitions = new ArrayList<>(partition2AllPotentialConsumers.keySet());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 3578540..c8f6c14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -222,6 +222,101 @@ public abstract class AbstractStickyAssignorTest {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    /**
+     * This unit test is testing consumer owned minQuota partitions, and expected to have maxQuota partitions situation
+     */
+    @Test
+    public void testConsumerOwningMinQuotaExpectedMaxQuota() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 2);
+        partitionsPerTopic.put(topic2, 3);
+
+        List<String> subscribedTopics = topics(topic1, topic2);
+
+        subscriptions.put(consumer1,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 1))));
+        subscriptions.put(consumer2,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 2))));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic1, 0), tp(topic2, 1), tp(topic2, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    /**
+     * This unit test is testing consumers owned maxQuota partitions are more than numExpectedMaxCapacityMembers situation
+     */
+    @Test
+    public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 2);
+        partitionsPerTopic.put(topic2, 2);
+
+        List<String> subscribedTopics = topics(topic1, topic2);
+
+        subscriptions.put(consumer1,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 0))));
+        subscriptions.put(consumer2,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 1))));
+        subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertEquals(partitions(tp(topic1, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
+
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    /**
+     * This unit test is testing all consumers owned less than minQuota partitions situation
+     */
+    @Test
+    public void testAllConsumerAreUnderMinQuota() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 2);
+        partitionsPerTopic.put(topic2, 3);
+
+        List<String> subscribedTopics = topics(topic1, topic2);
+
+        subscriptions.put(consumer1,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 0))));
+        subscriptions.put(consumer2,
+            buildSubscription(subscribedTopics, partitions(tp(topic1, 1))));
+        subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList()));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     @Test
     public void testAddRemoveConsumerOneTopic() {
         String consumer1 = "consumer1";
@@ -256,6 +351,56 @@ public abstract class AbstractStickyAssignorTest {
         assertTrue(isFullyBalanced(assignment));
     }
 
+    @Test
+    public void testAddRemoveTwoConsumersTwoTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+        String consumer4 = "consumer4";
+        List<String> allTopics = topics(topic1, topic2);
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 4);
+        subscriptions.put(consumer1, new Subscription(allTopics));
+        subscriptions.put(consumer2, new Subscription(allTopics));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1), tp(topic2, 3)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+
+        // add 2 consumers
+        subscriptions.put(consumer1, buildSubscription(allTopics, assignment.get(consumer1)));
+        subscriptions.put(consumer2, buildSubscription(allTopics, assignment.get(consumer2)));
+        subscriptions.put(consumer3, buildSubscription(allTopics, Collections.emptyList()));
+        subscriptions.put(consumer4, buildSubscription(allTopics, Collections.emptyList()));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer2));
+        assertEquals(partitions(tp(topic2, 1), tp(topic2, 3)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer4));
+        assertTrue(isFullyBalanced(assignment));
+
+        // remove 2 consumers
+        subscriptions.remove(consumer1);
+        subscriptions.remove(consumer2);
+        subscriptions.put(consumer3, buildSubscription(allTopics, assignment.get(consumer3)));
+        subscriptions.put(consumer4, buildSubscription(allTopics, assignment.get(consumer4)));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer3));
+        assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer4));
+
+        verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
     /**
      * This unit test performs sticky assignment for a scenario that round robin assignor handles poorly.
      * Topics (partitions per topic): topic1 (2), topic2 (1), topic3 (2), topic4 (1), topic5 (2)