You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 03:17:06 UTC

[1/2] kafka git commit: KAFKA-2273; Sticky partition assignment strategy (KIP-54)

Repository: kafka
Updated Branches:
  refs/heads/trunk 9815e18fe -> e1abf1770


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 4a49833..e565ce2 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -399,7 +399,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertEquals(0, consumer0.assignment().size)
 
-    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this 
+    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this
     consumer0.subscribe(pattern1, new TestConsumerReassignmentListener)
     consumer0.poll(50)
 
@@ -883,6 +883,58 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  def reverse(m: Map[Long, Set[TopicPartition]]) =
+    m.values.toSet.flatten.map(v => (v, m.keys.filter(m(_).contains(v)).head)).toMap
+
+  /**
+   * This test runs the following scenario to verify sticky assignor behavior.
+   * Topics: single-topic, with random number of partitions, where #par is 10, 20, 30, 40, 50, 60, 70, 80, 90, or 100
+   * Consumers: 9 consumers subscribed to the single topic
+   * Expected initial assignment: partitions are assigned to consumers in a round robin fashion.
+   *  - (#par mod 9) consumers will get (#par / 9 + 1) partitions, and the rest get (#par / 9) partitions
+   * Then consumer #10 is added to the list (subscribing to the same single topic)
+   * Expected new assignment:
+   *  - (#par / 10) partition per consumer, where one partition from each of the early (#par mod 9) consumers
+   *    will move to consumer #10, leading to a total of (#par mod 9) partition movement
+   */
+  @Test
+  def testMultiConsumerStickyAssignment() {
+    this.consumers.clear()
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[StickyAssignor].getName)
+
+    // create one new topic
+    val topic = "single-topic"
+    val rand = 1 + scala.util.Random.nextInt(10)
+    val partitions = createTopicAndSendRecords(topic, rand * 10, 100)
+
+    // create a group of consumers, subscribe the consumers to the single topic and start polling
+    // for the topic partition assignment
+    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
+    validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}")
+    val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
+
+    // add one more consumer and validate re-assignment
+    addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic), partitions)
+
+    val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
+    val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
+    var changes = 0
+    keys.foreach { key =>
+      val preVal = prePartition2PollerId.get(key)
+      val postVal = postPartition2PollerId.get(key)
+      if (preVal.nonEmpty && postVal.nonEmpty) {
+        if (preVal.get != postVal.get)
+          changes += 1
+      } else
+        changes += 1
+    }
+
+    consumerPollers.foreach(_.shutdown())
+
+    assertEquals("Expected only two topic partitions that have switched to other consumers.", rand, changes)
+  }
+
   /**
    * This test re-uses BaseConsumerTest's consumers.
    * As a result, it is testing the default assignment strategy set by BaseConsumerTest
@@ -1477,8 +1529,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
    * consumer poller and starts polling.
    * Assumes that the consumer is not subscribed to any topics yet
-    *
-    * @param consumer consumer
+   *
+   * @param consumer consumer
    * @param topicsToSubscribe topics that this consumer will subscribe to
    * @return consumer poller for the given consumer
    */


[2/2] kafka git commit: KAFKA-2273; Sticky partition assignment strategy (KIP-54)

Posted by jg...@apache.org.
KAFKA-2273; Sticky partition assignment strategy (KIP-54)

This PR implements a new partition assignment strategy called "sticky", and it's purpose is to balance partitions across consumers in a way that minimizes moving partitions around, or, in other words, preserves existing partition assignments as much as possible.

This patch is co-authored with rajinisivaram and edoardocomar.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #1020 from vahidhashemian/KAFKA-2273


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1abf177
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1abf177
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1abf177

Branch: refs/heads/trunk
Commit: e1abf17708918b82d3974ea028a4d74e3892fa0f
Parents: 9815e18
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed May 17 20:13:19 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 17 20:15:17 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/StickyAssignor.java  | 933 +++++++++++++++++++
 .../internals/AbstractPartitionAssignor.java    |   8 +-
 .../consumer/internals/ConsumerProtocol.java    |   1 -
 .../consumer/internals/PartitionAssignor.java   |   2 +-
 .../org/apache/kafka/common/TopicPartition.java |   1 -
 .../clients/consumer/StickyAssignorTest.java    | 689 ++++++++++++++
 .../kafka/api/PlaintextConsumerTest.scala       |  58 +-
 7 files changed, 1685 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
new file mode 100644
index 0000000..58e5915
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -0,0 +1,933 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
+ * - the numbers of topic partitions assigned to consumers differ by at most one; or
+ * - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
+ * Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the
+ * overhead processing when topic partitions move from one consumer to another.
+ *
+ * Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to
+ * how round robin assignor works, the second example below shows that it is not.
+ * During a reassignment it would perform the reassignment in such a way that in the new assignment
+ * 1. topic partitions are still distributed as evenly as possible, and
+ * 2. topic partitions stay with their previously assigned consumers as much as possible.
+ * Of course, the first goal above takes precedence over the second one.
+ *
+ * <b>Example 1.</b> Suppose there are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
+ * four topics <code>t0,</code> <code>t1</code>, <code>t2</code>, <code>t3</code>, and each topic has 2 partitions,
+ * resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
+ * <code>t2p1</code>, <code>t3p0</code>, <code>t3p1</code>. Each consumer is subscribed to all three topics.
+ *
+ * The assignment with both sticky and round robin assignors will be:
+ * <ul>
+ * <li><code>C0: [t0p0, t1p1, t3p0]<code></li>
+ * <li><code>C1: [t0p1, t2p0, t3p1]<code></li>
+ * <li><code>C2: [t1p0, t2p1]<code></li>
+ * </ul>
+ *
+ * Now, let's assume <code>C1</code> is removed and a reassignment is about to happen. The round robin assignor would produce:
+ * <ul>
+ * <li><code>C0: [t0p0, t1p0, t2p0, t3p0]</code></li>
+ * <li><code>C2: [t0p1, t1p1, t2p1, t3p1]</code></li>
+ * </ul>
+ *
+ * while the sticky assignor would result in:
+ * <ul>
+ * <li><code>C0 [t0p0, t1p1, t3p0, t2p0]</code></li>
+ * <li><code>C2 [t1p0, t2p1, t0p1, t3p1]</code></li>
+ * </ul>
+ * preserving all the previous assignments (unlike the round robin assignor).
+ *
+ * <b>Example 2.</b> There are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
+ * and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>, with 1, 2, and 3 partitions respectively.
+ * Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
+ * <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to
+ * <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
+ *
+ * The round robin assignor would come up with the following assignment:
+ * <ul>
+ * <li><code>C0 [t0p0]</code></li>
+ * <li><code>C1 [t1p0]</code></li>
+ * <li><code>C2 [t1p1, t2p0, t2p1, t2p2]</code></li>
+ * </ul>
+ *
+ * which is not as balanced as the assignment suggested by sticky assignor:
+ * <ul>
+ * <li><code>C0 [t0p0]</code></li>
+ * <li><code>C1 [t1p0, t1p1]</code></li>
+ * <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
+ * </ul>
+ *
+ * Now, if consumer <code>C0</code> is removed, these two assignors would produce the following assignments.
+ * Round Robin (preserves 3 partition assignments):
+ * <ul>
+ * <li><code>C1 [t0p0, t1p1]</code></li>
+ * <li><code>C2 [t1p0, t2p0, t2p1, t2p2]</code></li>
+ * </ul>
+ *
+ * Sticky (preserves 5 partition assignments):
+ * <ul>
+ * <li><code>C1 [t1p0, t1p1, t0p0]</code></li>
+ * <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
+ * </ul>
+ *
+ * <h3>Impact on <code>ConsumerRebalanceListener</code></h3>
+ * The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code
+ * in their <code>onPartitionsRevoked()</code> callback listeners. The cleanup code is placed in that callback listener
+ * because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it
+ * is using range or round robin assignor. The listener code would look like this:
+ * <code>
+ * class TheOldRebalanceListener implements ConsumerRebalanceListener {
+ *
+ *   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ *     for (TopicPartition partition: partitions) {
+ *       commitOffsets(partition);
+ *       cleanupState(partition);
+ *     }
+ *   }
+ *
+ *   void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ *     for (TopicPartition partition: partitions) {
+ *       initializeState(partition);
+ *       initializeOffset(partition);
+ *     }
+ *   }
+ * }
+ * </code>
+ *
+ * As mentioned above, one advantage of the sticky assignor is that, in general, it reduces the number of partitions that
+ * actually move from one consumer to another during a reassignment. Therefore, it allows consumers to do their cleanup
+ * more efficiently. Of course, they still can perform the partition cleanup in the <code>onPartitionsRevoked()</code>
+ * listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the
+ * cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below
+ * clarifies this point:
+ * <code>
+ * class TheNewRebalanceListener implements ConsumerRebalanceListener {
+ *   Collection<TopicPartition> lastAssignment = Collections.emptyList();
+ *
+ *   void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ *     for (TopicPartition partition: partitions)
+ *       commitOffsets(partition);
+ *   }
+ *
+ *   void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+ *     for (TopicPartition partition: difference(lastAssignment, assignment))
+ *       cleanupState(partition);
+ *
+ *     for (TopicPartition partition: difference(assignment, lastAssignment))
+ *       initializeState(partition);
+ *
+ *     for (TopicPartition partition: assignment)
+ *       initializeOffset(partition);
+ *
+ *     this.lastAssignment = assignment;
+ *   }
+ * }
+ * </code>
+ *
+ * Any consumer that uses sticky assignment can leverage this listener like this:
+ * <code>consumer.subscribe(topics, new TheNewRebalanceListener());</code>
+ *
+ */
+public class StickyAssignor extends AbstractPartitionAssignor {
+    private static final Logger log = LoggerFactory.getLogger(StickyAssignor.class);
+
+    // these schemas are used for preserving consumer's previously assigned partitions
+    // list and sending it as user data to the leader during a rebalance
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final Schema TOPIC_ASSIGNMENT = new Schema(
+            new Field(TOPIC_KEY_NAME, Type.STRING),
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
+    private static final Schema STICKY_ASSIGNOR_USER_DATA = new Schema(
+            new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)));
+
+    Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
+    private List<TopicPartition> memberAssignment = null;
+    private PartitionMovements partitionMovements;
+
+    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+                                                    Map<String, List<String>> subscriptions) {
+        partitionMovements = new PartitionMovements();
+
+        prepopulateCurrentAssignments();
+        // make a deep copy of currentAssignment
+        Map<String, List<TopicPartition>> oldAssignment = deepCopy(currentAssignment);
+
+        // a mapping of all topic partitions to all consumers that can be assigned to them
+        final HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers = new HashMap<>();
+        // a mapping of all consumers to all potential topic partitions that can be assigned to them
+        final HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions = new HashMap<>();
+
+        // initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops
+        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
+            for (int i = 0; i < entry.getValue(); ++i)
+                partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<String>());
+        }
+
+        for (Entry<String, List<String>> entry: subscriptions.entrySet()) {
+            String consumer = entry.getKey();
+            consumer2AllPotentialPartitions.put(consumer, new ArrayList<TopicPartition>());
+            for (String topic: entry.getValue()) {
+                for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
+                    TopicPartition topicPartition = new TopicPartition(topic, i);
+                    consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
+                    partition2AllPotentialConsumers.get(topicPartition).add(consumer);
+                }
+            }
+
+            // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
+            if (!currentAssignment.containsKey(consumer))
+                currentAssignment.put(consumer, new ArrayList<TopicPartition>());
+        }
+
+        // a mapping of partition to current consumer
+        HashMap<TopicPartition, String> currentPartitionConsumer = new HashMap<>();
+        for (Map.Entry<String, List<TopicPartition>> entry: currentAssignment.entrySet())
+            for (TopicPartition topicPartition: entry.getValue())
+                currentPartitionConsumer.put(topicPartition, entry.getKey());
+
+        List<TopicPartition> sortedPartitions = sortPartitions(oldAssignment.isEmpty(), partition2AllPotentialConsumers, consumer2AllPotentialPartitions);
+
+        // all partitions that need to be assigned (initially set to all partitions but adjusted in the following loop)
+        List<TopicPartition> unassignedPartitions = new ArrayList<>(sortedPartitions);
+        for (Iterator<Map.Entry<String, List<TopicPartition>>> it = currentAssignment.entrySet().iterator(); it.hasNext();) {
+            Map.Entry<String, List<TopicPartition>> entry = it.next();
+            if (!subscriptions.containsKey(entry.getKey())) {
+                // if a consumer that existed before (and had some partition assignments) is now removed, remove it from currentAssignment
+                for (TopicPartition topicPartition: entry.getValue())
+                    currentPartitionConsumer.remove(topicPartition);
+                it.remove();
+            } else {
+                // otherwise (the consumer still exists)
+                for (Iterator<TopicPartition> partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) {
+                    TopicPartition partition = partitionIter.next();
+                    if (!partition2AllPotentialConsumers.containsKey(partition)) {
+                        // if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer
+                        partitionIter.remove();
+                        currentPartitionConsumer.remove(partition);
+                    } else if (!subscriptions.get(entry.getKey()).contains(partition.topic())) {
+                        // if this partition cannot remain assigned to its current consumer because the consumer
+                        // is no longer subscribed to its topic remove it from currentAssignment of the consumer
+                        partitionIter.remove();
+                    } else
+                        // otherwise, remove the topic partition from those that need to be assigned only if
+                        // its current consumer is still subscribed to its topic (because it is already assigned
+                        // and we would want to preserve that assignment as much as possible)
+                        unassignedPartitions.remove(partition);
+                }
+            }
+        }
+        // at this point we have preserved all valid topic partition to consumer assignments and removed
+        // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
+        // to consumers so that the topic partition assignments are as balanced as possible.
+
+        // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+        TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));
+        sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
+
+        balance(sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions,
+                partition2AllPotentialConsumers, oldAssignment, currentPartitionConsumer);
+        return currentAssignment;
+    }
+
+    private void prepopulateCurrentAssignments() {
+        Map<String, Subscription> subscriptions = getSubscriptions();
+        if (subscriptions == null)
+            return;
+
+        currentAssignment.clear();
+        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
+            ByteBuffer userData = subscriptionEntry.getValue().userData();
+            if (userData != null && userData.hasRemaining())
+                currentAssignment.put(subscriptionEntry.getKey(), deserializeTopicPartitionAssignment(userData));
+        }
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment) {
+        memberAssignment = assignment.partitions();
+    }
+
+    @Override
+    public Subscription subscription(Set<String> topics) {
+        if (memberAssignment == null)
+            return new Subscription(new ArrayList<>(topics));
+
+        return new Subscription(new ArrayList<>(topics), serializeTopicPartitionAssignment(memberAssignment));
+    }
+
+    @Override
+    public String name() {
+        return "sticky";
+    }
+
+    /**
+     * determine if the current assignment is a balanced one
+     *
+     * @param sortedCurrentSubscriptions: an ascending sorted set of consumers based on how many topic partitions are already assigned to them
+     * @param allSubscriptions: a mapping of all consumers to all potential topic partitions that can be assigned to them
+     * @return
+     */
+    private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions, Map<String, List<TopicPartition>> allSubscriptions) {
+        int min = currentAssignment.get(sortedCurrentSubscriptions.first()).size();
+        int max = currentAssignment.get(sortedCurrentSubscriptions.last()).size();
+        if (min >= max - 1)
+            // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
+            return true;
+
+        // create a mapping from partitions to the consumer assigned to them
+        final HashMap<TopicPartition, String> allPartitions = new HashMap<>();
+        Set<Entry<String, List<TopicPartition>>> assignments = currentAssignment.entrySet();
+        for (Map.Entry<String, List<TopicPartition>> entry: assignments) {
+            List<TopicPartition> topicPartitions = entry.getValue();
+            for (TopicPartition topicPartition: topicPartitions) {
+                if (allPartitions.containsKey(topicPartition))
+                    log.error(topicPartition + " is assigned to more than one consumer.");
+                allPartitions.put(topicPartition, entry.getKey());
+            }
+        }
+
+        // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
+        // could but did not get cannot be moved to it (because that would break the balance)
+        for (String consumer: sortedCurrentSubscriptions) {
+            List<TopicPartition> consumerPartitions = currentAssignment.get(consumer);
+            int consumerPartitionCount = consumerPartitions.size();
+
+            // skip if this consumer already has all the topic partitions it can get
+            if (consumerPartitionCount == allSubscriptions.get(consumer).size())
+                continue;
+
+            // otherwise make sure it cannot get any more
+            List<TopicPartition> potentialTopicPartitions = allSubscriptions.get(consumer);
+            for (TopicPartition topicPartition: potentialTopicPartitions) {
+                if (!currentAssignment.get(consumer).contains(topicPartition)) {
+                    String otherConsumer = allPartitions.get(topicPartition);
+                    int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
+                    if (consumerPartitionCount < otherConsumerPartitionCount) {
+                        log.debug(topicPartition + " can be moved from consumer " + otherConsumer + " to consumer " + consumer + " for a more balanced assignment.");
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    /**
+     * @return the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
+     * A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
+     * Lower balance score indicates a more balanced assignment.
+     */
+    private int getBalanceScore(Map<String, List<TopicPartition>> assignment) {
+        int score = 0;
+
+        Map<String, Integer> consumer2AssignmentSize = new HashMap<>();
+        for (Entry<String, List<TopicPartition>> entry: assignment.entrySet())
+            consumer2AssignmentSize.put(entry.getKey(), entry.getValue().size());
+
+        Iterator<Entry<String, Integer>> it = consumer2AssignmentSize.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, Integer> entry = it.next();
+            int consumerAssignmentSize = entry.getValue();
+            it.remove();
+            for (Entry<String, Integer> otherEntry: consumer2AssignmentSize.entrySet())
+                score += Math.abs(consumerAssignmentSize - otherEntry.getValue());
+        }
+
+        return score;
+    }
+
+    /**
+     * Sort valid partitions so they are processed in the potential reassignment phase in the proper order
+     * that causes minimal partition movement among consumers (hence honoring maximal stickiness)
+     *
+     * @param isFreshAssignment whether this is a new assignment, or a reassignment of an existing one
+     * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers
+     * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from
+     * @return sorted list of valid partitions
+     */
+    private List<TopicPartition> sortPartitions(boolean isFreshAssignment,
+                                                HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers,
+                                                HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
+        List<TopicPartition> sortedPartitions = new ArrayList<>();
+
+        if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) {
+            // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
+            // then we just need to simply list partitions in a round robin fashion (from consumers with
+            // most assigned partitions to those with least)
+            Map<String, List<TopicPartition>> assignments = deepCopy(currentAssignment);
+            for (Entry<String, List<TopicPartition>> entry: assignments.entrySet()) {
+                List<TopicPartition> toRemove = new ArrayList<>();
+                for (TopicPartition partition: entry.getValue())
+                    if (!partition2AllPotentialConsumers.keySet().contains(partition))
+                        toRemove.add(partition);
+                for (TopicPartition partition: toRemove)
+                    entry.getValue().remove(partition);
+            }
+            TreeSet<String> sortedConsumers = new TreeSet<>(new SubscriptionComparator(assignments));
+            sortedConsumers.addAll(assignments.keySet());
+
+            while (!sortedConsumers.isEmpty()) {
+                String consumer = sortedConsumers.pollLast();
+                List<TopicPartition> remainingPartitions = assignments.get(consumer);
+                if (!remainingPartitions.isEmpty()) {
+                    sortedPartitions.add(remainingPartitions.remove(0));
+                    sortedConsumers.add(consumer);
+                }
+            }
+
+            for (TopicPartition partition: partition2AllPotentialConsumers.keySet()) {
+                if (!sortedPartitions.contains(partition))
+                    sortedPartitions.add(partition);
+            }
+
+        } else {
+            // an ascending sorted set of topic partitions based on how many consumers can potentially use them
+            TreeSet<TopicPartition> sortedAllPartitions = new TreeSet<>(new PartitionComparator(partition2AllPotentialConsumers));
+            sortedAllPartitions.addAll(partition2AllPotentialConsumers.keySet());
+
+            while (!sortedAllPartitions.isEmpty())
+                sortedPartitions.add(sortedAllPartitions.pollFirst());
+        }
+
+        return sortedPartitions;
+    }
+
+    /**
+     * @param partition2AllPotentialConsumers a mapping of partitions to their potential consumers
+     * @param consumer2AllPotentialPartitions a mapping of consumers to potential partitions they can consumer from
+     * @return true if potential consumers of partitions are the same, and potential partitions consumers can
+     * consumer from are the same too
+     */
+    private boolean areSubscriptionsIdentical(HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers,
+                                              HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
+        if (!hasIdenticalListElements(partition2AllPotentialConsumers.values()))
+            return false;
+
+        if (!hasIdenticalListElements(consumer2AllPotentialPartitions.values()))
+            return false;
+
+        return true;
+    }
+
+    /**
+     * @param col a collection of elements of type list
+     * @return true if all lists in the collection have the same members; false otherwise
+     */
+    private <T> boolean hasIdenticalListElements(Collection<List<T>> col) {
+        Iterator<List<T>> it = col.iterator();
+        List<T> cur = it.next();
+        while (it.hasNext()) {
+            List<T> next = it.next();
+            if (!(cur.containsAll(next) && next.containsAll(cur)))
+                return false;
+            cur = next;
+        }
+        return true;
+    }
+
+    /**
+     * @return the consumer to which the given partition is assigned. The assignment should improve the overall balance
+     * of the partition assignments to consumers.
+     */
+    private String assignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions,
+            HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, String> currentPartitionConsumer) {
+        for (String consumer: sortedCurrentSubscriptions) {
+            if (consumer2AllPotentialPartitions.get(consumer).contains(partition)) {
+                sortedCurrentSubscriptions.remove(consumer);
+                currentAssignment.get(consumer).add(partition);
+                currentPartitionConsumer.put(partition, consumer);
+                sortedCurrentSubscriptions.add(consumer);
+                return consumer;
+            }
+        }
+        return null;
+    }
+
+    private boolean canParticipateInReassignment(TopicPartition partition, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) {
+        // if a partition has two or more potential consumers it is subject to reassignment.
+        return partition2AllPotentialConsumers.get(partition).size() >= 2;
+    }
+
+    private boolean canParticipateInReassignment(String consumer,
+                                                 HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions,
+                                                 HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers) {
+        List<TopicPartition> currentPartitions = currentAssignment.get(consumer);
+        int currentAssignmentSize = currentPartitions.size();
+        int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size();
+        if (currentAssignmentSize > maxAssignmentSize)
+            log.error("The consumer " + consumer + " is assigned more partitions than the maximum possible.");
+
+        if (currentAssignmentSize < maxAssignmentSize)
+            // if a consumer is not assigned all its potential partitions it is subject to reassignment
+            return true;
+
+        for (TopicPartition partition: currentPartitions)
+            // if any of the partitions assigned to a consumer is subject to reassignment the consumer itself
+            // is subject to reassignment
+            if (canParticipateInReassignment(partition, partition2AllPotentialConsumers))
+                return true;
+
+        return false;
+    }
+
+    /**
+     * Balance the current assignment using the data structures created in the assign(...) method above.
+     */
+    private void balance(List<TopicPartition> sortedPartitions, List<TopicPartition> unassignedPartitions, TreeSet<String> sortedCurrentSubscriptions,
+                         HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions, HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers,
+                         Map<String, List<TopicPartition>> oldAssignment, HashMap<TopicPartition, String> currentPartitionConsumer) {
+        boolean initializing = currentAssignment.get(sortedCurrentSubscriptions.last()).isEmpty();
+        boolean reassignmentPerformed = false;
+
+        // assign all unassigned partitions
+        for (TopicPartition partition: unassignedPartitions) {
+            // skip if there is no potential consumer for the partition
+            if (partition2AllPotentialConsumers.get(partition).isEmpty())
+                continue;
+
+            assignPartition(partition, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, currentPartitionConsumer);
+        }
+
+        // narrow down the reassignment scope to only those partitions that can actually be reassigned
+        Set<TopicPartition> fixedPartitions = new HashSet<>();
+        for (TopicPartition partition: partition2AllPotentialConsumers.keySet())
+            if (!canParticipateInReassignment(partition, partition2AllPotentialConsumers))
+                fixedPartitions.add(partition);
+        sortedPartitions.removeAll(fixedPartitions);
+
+        // narrow down the reassignment scope to only those consumers that are subject to reassignment
+        Map<String, List<TopicPartition>> fixedAssignments = new HashMap<>();
+        for (String consumer: consumer2AllPotentialPartitions.keySet())
+            if (!canParticipateInReassignment(consumer, consumer2AllPotentialPartitions, partition2AllPotentialConsumers)) {
+                sortedCurrentSubscriptions.remove(consumer);
+                fixedAssignments.put(consumer, currentAssignment.remove(consumer));
+            }
+
+        // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
+        Map<String, List<TopicPartition>> preBalanceAssignment = deepCopy(currentAssignment);
+        HashMap<TopicPartition, String> preBalancePartitionConsumers = new HashMap<>(currentPartitionConsumer);
+
+        reassignmentPerformed = performReassignments(sortedPartitions, sortedCurrentSubscriptions,
+                consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer);
+
+        // if we are not preserving existing assignments and we have made changes to the current assignment
+        // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
+        if (!initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment)) {
+            deepCopy(preBalanceAssignment, currentAssignment);
+            currentPartitionConsumer.clear();
+            currentPartitionConsumer.putAll(preBalancePartitionConsumers);
+        }
+
+        // add the fixed assignments (those that could not change) back
+        for (Entry<String, List<TopicPartition>> entry: fixedAssignments.entrySet()) {
+            String consumer = entry.getKey();
+            currentAssignment.put(consumer, entry.getValue());
+            sortedCurrentSubscriptions.add(consumer);
+        }
+
+        fixedAssignments.clear();
+    }
+
+    private boolean performReassignments(List<TopicPartition> reassignablePartitions, TreeSet<String> sortedCurrentSubscriptions,
+                                         HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions,
+                                         HashMap<TopicPartition, List<String>> partition2AllPotentialConsumers,
+                                         HashMap<TopicPartition, String> currentPartitionConsumer) {
+        boolean reassignmentPerformed = false;
+        boolean modified;
+
+        // repeat reassignment until no partition can be moved to improve the balance
+        do {
+            modified = false;
+            // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
+            // until the full list is processed or a balance is achieved
+            Iterator<TopicPartition> partitionIterator = reassignablePartitions.iterator();
+            while (partitionIterator.hasNext() && !isBalanced(sortedCurrentSubscriptions, consumer2AllPotentialPartitions)) {
+                TopicPartition partition = partitionIterator.next();
+
+                // the partition must have at least two consumers
+                if (partition2AllPotentialConsumers.get(partition).size() <= 1)
+                    log.error("Expected more than one potential consumer for partition '" + partition + "'");
+
+                // the partition must have a current consumer
+                String consumer = currentPartitionConsumer.get(partition);
+                if (consumer == null)
+                    log.error("Expected partition '" + partition + "' to be assigned to a consumer");
+
+                // check if a better-suited consumer exist for the partition; if so, reassign it
+                for (String otherConsumer: partition2AllPotentialConsumers.get(partition)) {
+                    if (currentAssignment.get(consumer).size() > currentAssignment.get(otherConsumer).size() + 1) {
+                        reassignPartition(partition, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions);
+                        reassignmentPerformed = true;
+                        modified = true;
+                        break;
+                    }
+                }
+            }
+        } while (modified);
+
+        return reassignmentPerformed;
+    }
+
+    private void reassignPartition(TopicPartition partition, TreeSet<String> sortedCurrentSubscriptions,
+                                   HashMap<TopicPartition, String> currentPartitionConsumer,
+                                   HashMap<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
+        String consumer = currentPartitionConsumer.get(partition);
+
+        // find the new consumer
+        String newConsumer = null;
+        for (String anotherConsumer: sortedCurrentSubscriptions) {
+            if (consumer2AllPotentialPartitions.get(anotherConsumer).contains(partition)) {
+                newConsumer = anotherConsumer;
+                break;
+            }
+        }
+
+        assert newConsumer != null;
+
+        // find the correct partition movement considering the stickiness requirement
+        TopicPartition partitionToBeMoved = partitionMovements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer);
+        processPartitionMovement(partitionToBeMoved, newConsumer, sortedCurrentSubscriptions, currentPartitionConsumer);
+
+        return;
+    }
+
+    private void processPartitionMovement(TopicPartition partition, String newConsumer,
+                                          TreeSet<String> sortedCurrentSubscriptions,
+                                          HashMap<TopicPartition, String> currentPartitionConsumer) {
+        String oldConsumer = currentPartitionConsumer.get(partition);
+
+        sortedCurrentSubscriptions.remove(oldConsumer);
+        sortedCurrentSubscriptions.remove(newConsumer);
+
+        partitionMovements.movePartition(partition, oldConsumer, newConsumer);
+
+        currentAssignment.get(oldConsumer).remove(partition);
+        currentAssignment.get(newConsumer).add(partition);
+        currentPartitionConsumer.put(partition, newConsumer);
+        sortedCurrentSubscriptions.add(newConsumer);
+        sortedCurrentSubscriptions.add(oldConsumer);
+    }
+
+    boolean isSticky() {
+        return partitionMovements.isSticky();
+    }
+
+    private static ByteBuffer serializeTopicPartitionAssignment(List<TopicPartition> partitions) {
+        Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA);
+        List<Struct> topicAssignments = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupDataByTopic(partitions).entrySet()) {
+            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
+            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+            topicAssignments.add(topicAssignment);
+        }
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
+        ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA.sizeOf(struct));
+        STICKY_ASSIGNOR_USER_DATA.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    private static List<TopicPartition> deserializeTopicPartitionAssignment(ByteBuffer buffer) {
+        Struct struct = STICKY_ASSIGNOR_USER_DATA.read(buffer);
+        List<TopicPartition> partitions = new ArrayList<>();
+        for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
+            Struct assignment = (Struct) structObj;
+            String topic = assignment.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
+                Integer partition = (Integer) partitionObj;
+                partitions.add(new TopicPartition(topic, partition));
+            }
+        }
+        return partitions;
+    }
+
+    private void deepCopy(Map<String, List<TopicPartition>> source, Map<String, List<TopicPartition>> dest) {
+        dest.clear();
+        for (Entry<String, List<TopicPartition>> entry: source.entrySet())
+            dest.put(entry.getKey(), new ArrayList<>(entry.getValue()));
+    }
+
+    private Map<String, List<TopicPartition>> deepCopy(Map<String, List<TopicPartition>> assignment) {
+        Map<String, List<TopicPartition>> copy = new HashMap<>();
+        deepCopy(assignment, copy);
+        return copy;
+    }
+
+    private static class PartitionComparator implements Comparator<TopicPartition>, Serializable {
+        private static final long serialVersionUID = 1L;
+        private Map<TopicPartition, List<String>> map;
+
+        PartitionComparator(Map<TopicPartition, List<String>> map) {
+            this.map = map;
+        }
+
+        @Override
+        public int compare(TopicPartition o1, TopicPartition o2) {
+            int ret = map.get(o1).size() - map.get(o2).size();
+            if (ret == 0) {
+                ret = o1.topic().compareTo(o2.topic());
+                if (ret == 0)
+                    ret = o1.partition() - o2.partition();
+            }
+            return ret;
+        }
+    }
+
+    private static class SubscriptionComparator implements Comparator<String>, Serializable {
+        private static final long serialVersionUID = 1L;
+        private Map<String, List<TopicPartition>> map;
+
+        SubscriptionComparator(Map<String, List<TopicPartition>> map) {
+            this.map = map;
+        }
+
+        @Override
+        public int compare(String o1, String o2) {
+            int ret = map.get(o1).size() - map.get(o2).size();
+            if (ret == 0)
+                ret = o1.compareTo(o2);
+            return ret;
+        }
+    }
+
+    /**
+     * This class maintains some data structures to simplify lookup of partition movements among consumers. At each point of
+     * time during a partition rebalance it keeps track of partition movements corresponding to each topic, and also possible
+     * movement (in form a <code>ConsumerPair</code> object) for each partition.
+     */
+    private static class PartitionMovements {
+        private Map<String, Map<ConsumerPair, Set<TopicPartition>>> partitionMovementsByTopic = new HashMap<>();
+        private Map<TopicPartition, ConsumerPair> partitionMovements = new HashMap<>();
+
+        private ConsumerPair removeMovementRecordOfPartition(TopicPartition partition) {
+            ConsumerPair pair = partitionMovements.remove(partition);
+
+            String topic = partition.topic();
+            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
+            partitionMovementsForThisTopic.get(pair).remove(partition);
+            if (partitionMovementsForThisTopic.get(pair).isEmpty())
+                partitionMovementsForThisTopic.remove(pair);
+            if (partitionMovementsByTopic.get(topic).isEmpty())
+                partitionMovementsByTopic.remove(topic);
+
+            return pair;
+        }
+
+        private void addPartitionMovementRecord(TopicPartition partition, ConsumerPair pair) {
+            partitionMovements.put(partition, pair);
+
+            String topic = partition.topic();
+            if (!partitionMovementsByTopic.containsKey(topic))
+                partitionMovementsByTopic.put(topic, new HashMap<ConsumerPair, Set<TopicPartition>>());
+
+            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
+            if (!partitionMovementsForThisTopic.containsKey(pair))
+                partitionMovementsForThisTopic.put(pair, new HashSet<TopicPartition>());
+
+            partitionMovementsForThisTopic.get(pair).add(partition);
+        }
+
+        private void movePartition(TopicPartition partition, String oldConsumer, String newConsumer) {
+            ConsumerPair pair = new ConsumerPair(oldConsumer, newConsumer);
+
+            if (partitionMovements.containsKey(partition)) {
+                // this partition has previously moved
+                ConsumerPair existingPair = removeMovementRecordOfPartition(partition);
+                assert existingPair.dstMemberId.equals(oldConsumer);
+                if (!existingPair.srcMemberId.equals(newConsumer)) {
+                    // the partition is not moving back to its previous consumer
+                    // return new ConsumerPair2(existingPair.src, newConsumer);
+                    addPartitionMovementRecord(partition, new ConsumerPair(existingPair.srcMemberId, newConsumer));
+                }
+            } else
+                addPartitionMovementRecord(partition, pair);
+        }
+
+        private TopicPartition getTheActualPartitionToBeMoved(TopicPartition partition, String oldConsumer, String newConsumer) {
+            String topic = partition.topic();
+
+            if (!partitionMovementsByTopic.containsKey(topic))
+                return partition;
+
+            if (partitionMovements.containsKey(partition)) {
+                // this partition has previously moved
+                assert oldConsumer.equals(partitionMovements.get(partition).dstMemberId);
+                oldConsumer = partitionMovements.get(partition).srcMemberId;
+            }
+
+            Map<ConsumerPair, Set<TopicPartition>> partitionMovementsForThisTopic = partitionMovementsByTopic.get(topic);
+            ConsumerPair reversePair = new ConsumerPair(newConsumer, oldConsumer);
+            if (!partitionMovementsForThisTopic.containsKey(reversePair))
+                return partition;
+
+            return partitionMovementsForThisTopic.get(reversePair).iterator().next();
+        }
+
+        private boolean isLinked(String src, String dst, Set<ConsumerPair> pairs, List<String> currentPath) {
+            if (src.equals(dst))
+                return false;
+
+            if (pairs.isEmpty())
+                return false;
+
+            if (new ConsumerPair(src, dst).in(pairs)) {
+                currentPath.add(src);
+                currentPath.add(dst);
+                return true;
+            }
+
+            for (ConsumerPair pair: pairs)
+                if (pair.srcMemberId.equals(src)) {
+                    Set<ConsumerPair> reducedSet = new HashSet<>(pairs);
+                    reducedSet.remove(pair);
+                    currentPath.add(pair.srcMemberId);
+                    return isLinked(pair.dstMemberId, dst, reducedSet, currentPath);
+                }
+
+            return false;
+        }
+
+        private boolean in(List<String> cycle, Set<List<String>> cycles) {
+            List<String> superCycle = new ArrayList<>(cycle);
+            superCycle.remove(superCycle.size() - 1);
+            superCycle.addAll(cycle);
+            for (List<String> foundCycle: cycles) {
+                if (foundCycle.size() == cycle.size() && Collections.indexOfSubList(superCycle, foundCycle) != -1)
+                    return true;
+            }
+            return false;
+        }
+
+        private boolean hasCycles(Set<ConsumerPair> pairs) {
+            Set<List<String>> cycles = new HashSet<>();
+            for (ConsumerPair pair: pairs) {
+                Set<ConsumerPair> reducedPairs = new HashSet<>(pairs);
+                reducedPairs.remove(pair);
+                List<String> path = new ArrayList<>(Collections.singleton(pair.srcMemberId));
+                if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) && !in(path, cycles)) {
+                    cycles.add(new ArrayList<>(path));
+                    log.error("A cycle of length " + (path.size() - 1) + " was found: " + path.toString());
+                }
+            }
+
+            // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
+            // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
+            // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
+            for (List<String> cycle: cycles)
+                if (cycle.size() == 3) // indicates a cycle of length 2
+                    return true;
+            return false;
+        }
+
+        private boolean isSticky() {
+            for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>> topicMovements: this.partitionMovementsByTopic.entrySet()) {
+                Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet();
+                if (hasCycles(topicMovementPairs)) {
+                    log.error("Stickiness is violated for topic " + topicMovements.getKey()
+                            + "\nPartition movements for this topic occurred among the following consumer pairs:"
+                            + "\n" + topicMovements.getValue().toString());
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * <code>ConsumerPair</code> represents a pair of Kafka consumer ids involved in a partition reassignment. Each
+     * <code>ConsumerPair</code> object, which contains a source (<code>src</code>) and a destination (<code>dst</code>)
+     * element, normally corresponds to a particular partition or topic, and indicates that the particular partition or some
+     * partition of the particular topic was moved from the source consumer to the destination consumer during the rebalance.
+     * This class is used, through the <code>PartitionMovements</code> class, by the sticky assignor and helps in determining
+     * whether a partition reassignment results in cycles among the generated graph of consumer pairs.
+     */
+    private static class ConsumerPair {
+        private final String srcMemberId;
+        private final String dstMemberId;
+
+        ConsumerPair(String srcMemberId, String dstMemberId) {
+            this.srcMemberId = srcMemberId;
+            this.dstMemberId = dstMemberId;
+        }
+
+        public String toString() {
+            return this.srcMemberId + "->" + this.dstMemberId;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((this.srcMemberId == null) ? 0 : this.srcMemberId.hashCode());
+            result = prime * result + ((this.dstMemberId == null) ? 0 : this.dstMemberId.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null)
+                return false;
+
+            if (!getClass().isInstance(obj))
+                return false;
+
+            ConsumerPair otherPair = (ConsumerPair) obj;
+            return this.srcMemberId.equals(otherPair.srcMemberId) && this.dstMemberId.equals(otherPair.dstMemberId);
+        }
+
+        private boolean in(Set<ConsumerPair> pairs) {
+            for (ConsumerPair pair: pairs)
+                if (this.equals(pair))
+                    return true;
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 5c97693..bc87ed0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -34,6 +34,7 @@ import java.util.Set;
  */
 public abstract class AbstractPartitionAssignor implements PartitionAssignor {
     private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
+    private Map<String, Subscription> subscriptions = null;
 
     /**
      * Perform the group assignment given the partition counts and member subscriptions
@@ -52,6 +53,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
 
     @Override
     public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+        this.subscriptions = new HashMap<>(subscriptions);
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, List<String>> topicSubscriptions = new HashMap<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
@@ -71,13 +73,17 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
 
         Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions);
 
-        // this class has maintains no user data, so just wrap the results
+        // this class maintains no user data, so just wrap the results
         Map<String, Assignment> assignments = new HashMap<>();
         for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
             assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
         return assignments;
     }
 
+    protected Map<String, Subscription> getSubscriptions() {
+        return subscriptions;
+    }
+
     @Override
     public void onAssignment(Assignment assignment) {
         // this assignor maintains no internal state, so nothing to do

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 392e272..f8be9a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -146,7 +146,6 @@ public class ConsumerProtocol {
         // otherwise, assume versions can be parsed as V0
     }
 
-
     private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) {
         Map<String, List<Integer>> partitionMap = new HashMap<>();
         for (TopicPartition partition : partitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 86683a0..4a7c7a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -67,7 +67,7 @@ public interface PartitionAssignor {
 
 
     /**
-     * Unique name for this assignor (e.g. "range" or "roundrobin")
+     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
      * @return non-null unique name
      */
     String name();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 832bcd8..dc79c2e 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -75,5 +75,4 @@ public final class TopicPartition implements Serializable {
     public String toString() {
         return topic + "-" + partition;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
new file mode 100644
index 0000000..e9cc828
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+public class StickyAssignorTest {
+
+    private StickyAssignor assignor = new StickyAssignor();
+
+    @Test
+    public void testOneConsumerNoTopic() {
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, Collections.<String>emptyList());
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOneConsumerNonexistentTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 0);
+        Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+
+        assertEquals(Collections.singleton(consumerId), assignment.keySet());
+        assertTrue(assignment.get(consumerId).isEmpty());
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOneConsumerOneTopic() {
+        String topic = "topic";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOnlyAssignsPartitionsFromSubscribedTopics() {
+        String topic = "topic";
+        String otherTopic = "other";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        partitionsPerTopic.put(otherTopic, 3);
+        Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testOneConsumerMultipleTopics() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumerId = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 1);
+        partitionsPerTopic.put(topic2, 2);
+        Map<String, List<String>> subscriptions = Collections.singletonMap(consumerId, topics(topic1, topic2));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testTwoConsumersOneTopicOnePartition() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 1);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic));
+        subscriptions.put(consumer2, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1));
+        assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testTwoConsumersOneTopicTwoPartitions() {
+        String topic = "topic";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 2);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic));
+        subscriptions.put(consumer2, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(tp(topic, 1)), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testMultipleConsumersMixedTopicSubscriptions() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+        String consumer3 = "consumer3";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 2);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic1));
+        subscriptions.put(consumer2, topics(topic1, topic2));
+        subscriptions.put(consumer3, topics(topic1));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(tp(topic2, 0), tp(topic2, 1)), assignment.get(consumer2));
+        assertEquals(Arrays.asList(tp(topic1, 1)), assignment.get(consumer3));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testTwoConsumersTwoTopicsSixPartitions() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic1, topic2));
+        subscriptions.put(consumer2, topics(topic1, topic2));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+    }
+
+    @Test
+    public void testAddRemoveConsumerOneTopic() {
+        String topic = "topic";
+        String consumer1 = "consumer";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumer1));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+
+        String consumer2 = "consumer2";
+        subscriptions.put(consumer2, topics(topic));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertEquals(Arrays.asList(tp(topic, 1), tp(topic, 2)), assignment.get(consumer1));
+        assertEquals(Arrays.asList(tp(topic, 0)), assignment.get(consumer2));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+        assertTrue(assignor.isSticky());
+
+        subscriptions.remove(consumer1);
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        assertTrue(assignment.get(consumer2).contains(tp(topic, 0)));
+        assertTrue(assignment.get(consumer2).contains(tp(topic, 1)));
+        assertTrue(assignment.get(consumer2).contains(tp(topic, 2)));
+
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+        assertTrue(assignor.isSticky());
+    }
+
+    /**
+     * This unit test performs sticky assignment for a scenario that round robin assignor handles poorly.
+     * Topics (partitions per topic): topic1 (2), topic2 (1), topic3 (2), topic4 (1), topic5 (2)
+     * Subscriptions:
+     *  - consumer1: topic1, topic2, topic3, topic4, topic5
+     *  - consumer2: topic1, topic3, topic5
+     *  - consumer3: topic1, topic3, topic5
+     *  - consumer4: topic1, topic2, topic3, topic4, topic5
+     * Round Robin Assignment Result:
+     *  - consumer1: topic1-0, topic3-0, topic5-0
+     *  - consumer2: topic1-1, topic3-1, topic5-1
+     *  - consumer3:
+     *  - consumer4: topic2-0, topic4-0
+     * Sticky Assignment Result:
+     *  - consumer1: topic2-0, topic3-0
+     *  - consumer2: topic1-0, topic3-1
+     *  - consumer3: topic1-1, topic5-0
+     *  - consumer4: topic4-0, topic5-1
+     */
+    @Test
+    public void testPoorRoundRobinAssignmentScenario() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 1; i <= 5; i++)
+            partitionsPerTopic.put(String.format("topic%d", i), (i % 2) + 1);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put("consumer1", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5"));
+        subscriptions.put("consumer2", Arrays.asList("topic1", "topic3", "topic5"));
+        subscriptions.put("consumer3", Arrays.asList("topic1", "topic3", "topic5"));
+        subscriptions.put("consumer4", Arrays.asList("topic1", "topic2", "topic3", "topic4", "topic5"));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+    }
+
+    @Test
+    public void testAddRemoveTopicTwoConsumers() {
+        String topic = "topic";
+        String consumer1 = "consumer";
+        String consumer2 = "consumer2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic, 3);
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put(consumer1, topics(topic));
+        subscriptions.put(consumer2, topics(topic));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        // verify balance
+        assertTrue(isFullyBalanced(assignment));
+        verifyValidityAndBalance(subscriptions, assignment);
+        // verify stickiness
+        List<TopicPartition> consumer1Assignment1 = assignment.get(consumer1);
+        List<TopicPartition> consumer2Assignment1 = assignment.get(consumer2);
+        assertTrue((consumer1Assignment1.size() == 1 && consumer2Assignment1.size() == 2) ||
+                   (consumer1Assignment1.size() == 2 && consumer2Assignment1.size() == 1));
+
+        String topic2 = "topic2";
+        partitionsPerTopic.put(topic2, 3);
+        subscriptions.put(consumer1, topics(topic, topic2));
+        subscriptions.put(consumer2, topics(topic, topic2));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        // verify balance
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+        // verify stickiness
+        List<TopicPartition> consumer1assignment = assignment.get(consumer1);
+        List<TopicPartition> consumer2assignment = assignment.get(consumer2);
+        assertTrue(consumer1assignment.size() == 3 && consumer2assignment.size() == 3);
+        assertTrue(consumer1assignment.containsAll(consumer1Assignment1));
+        assertTrue(consumer2assignment.containsAll(consumer2Assignment1));
+        assertTrue(assignor.isSticky());
+
+        partitionsPerTopic.remove(topic);
+        subscriptions.put(consumer1, topics(topic2));
+        subscriptions.put(consumer2, topics(topic2));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        // verify balance
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(isFullyBalanced(assignment));
+        // verify stickiness
+        List<TopicPartition> consumer1Assignment3 = assignment.get(consumer1);
+        List<TopicPartition> consumer2Assignment3 = assignment.get(consumer2);
+        assertTrue((consumer1Assignment3.size() == 1 && consumer2Assignment3.size() == 2) ||
+                   (consumer1Assignment3.size() == 2 && consumer2Assignment3.size() == 1));
+        assertTrue(consumer1assignment.containsAll(consumer1Assignment3));
+        assertTrue(consumer2assignment.containsAll(consumer2Assignment3));
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testReassignmentAfterOneConsumerLeaves() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 1; i < 20; i++)
+            partitionsPerTopic.put(String.format("topic%02d", i), i);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        for (int i = 1; i < 20; i++) {
+            List<String> topics = new ArrayList<String>();
+            for (int j = 1; j <= i; j++)
+                topics.add(String.format("topic%02d", j));
+            subscriptions.put(String.format("consumer%02d", i), topics);
+        }
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+
+        subscriptions.remove("consumer10");
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testReassignmentAfterOneConsumerAdded() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put("topic", 20);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        for (int i = 1; i < 10; i++)
+            subscriptions.put(String.format("consumer%02d", i), Collections.singletonList("topic"));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+
+        subscriptions.put("consumer10", Collections.singletonList("topic"));
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testSameSubscriptions() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 1; i < 15; i++)
+            partitionsPerTopic.put(String.format("topic%02d", i), i);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        for (int i = 1; i < 9; i++) {
+            List<String> topics = new ArrayList<String>();
+            for (int j = 1; j <= partitionsPerTopic.size(); j++)
+                topics.add(String.format("topic%02d", j));
+            subscriptions.put(String.format("consumer%02d", i), topics);
+        }
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+
+        subscriptions.remove("consumer05");
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testLargeAssignmentWithMultipleConsumersLeaving() {
+        Random rand = new Random();
+        int topicCount = 40;
+        int consumerCount = 200;
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 0; i < topicCount; i++)
+            partitionsPerTopic.put(getTopicName(i, topicCount), rand.nextInt(10) + 1);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        for (int i = 0; i < consumerCount; i++) {
+            List<String> topics = new ArrayList<String>();
+            for (int j = 0; j < rand.nextInt(20); j++)
+                topics.add(getTopicName(rand.nextInt(topicCount), topicCount));
+            subscriptions.put(getConsumerName(i, consumerCount), topics);
+        }
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+
+        for (int i = 0; i < 100; ++i) {
+            String c = getConsumerName(rand.nextInt(consumerCount), consumerCount);
+            subscriptions.remove(c);
+        }
+
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testNewSubscription() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 1; i < 5; i++)
+            partitionsPerTopic.put(String.format("topic%02d", i), 1);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        for (int i = 0; i < 3; i++) {
+            List<String> topics = new ArrayList<String>();
+            for (int j = i; j <= 3 * i - 2; j++)
+                topics.add(String.format("topic%02d", j));
+            subscriptions.put(String.format("consumer%02d", i), topics);
+        }
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+
+        subscriptions.get("consumer00").add("topic01");
+
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+    }
+
+    @Test
+    public void testReassignmentWithRandomSubscriptionsAndChanges() {
+        final int minNumConsumers = 20;
+        final int maxNumConsumers = 40;
+        final int minNumTopics = 10;
+        final int maxNumTopics = 20;
+
+        for (int round = 1; round <= 100; ++round) {
+            int numTopics = minNumTopics + new Random().nextInt(maxNumTopics - minNumTopics);
+
+            ArrayList<String> topics = new ArrayList<>();
+            for (int i = 0; i < numTopics; ++i)
+                topics.add(getTopicName(i, maxNumTopics));
+
+            Map<String, Integer> partitionsPerTopic = new HashMap<>();
+            for (int i = 0; i < numTopics; ++i)
+                partitionsPerTopic.put(getTopicName(i, maxNumTopics), i + 1);
+
+            int numConsumers = minNumConsumers + new Random().nextInt(maxNumConsumers - minNumConsumers);
+
+            Map<String, List<String>> subscriptions = new HashMap<>();
+            for (int i = 0; i < numConsumers; ++i) {
+                List<String> sub = Utils.sorted(getRandomSublist(topics));
+                subscriptions.put(getConsumerName(i, maxNumConsumers), sub);
+            }
+
+            StickyAssignor assignor = new StickyAssignor();
+
+            Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+            verifyValidityAndBalance(subscriptions, assignment);
+
+            subscriptions.clear();
+            for (int i = 0; i < numConsumers; ++i) {
+                List<String> sub = Utils.sorted(getRandomSublist(topics));
+                subscriptions.put(getConsumerName(i, maxNumConsumers), sub);
+            }
+
+            assignment = assignor.assign(partitionsPerTopic, subscriptions);
+            verifyValidityAndBalance(subscriptions, assignment);
+            assertTrue(assignor.isSticky());
+        }
+    }
+
+    @Test
+    public void testMoveExistingAssignments() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        for (int i = 1; i <= 6; i++)
+            partitionsPerTopic.put(String.format("topic%02d", i), 1);
+
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put("consumer01", topics("topic01", "topic02"));
+        subscriptions.put("consumer02", topics("topic01", "topic02", "topic03", "topic04"));
+        subscriptions.put("consumer03", topics("topic02", "topic03", "topic04", "topic05", "topic06"));
+
+        assignor.currentAssignment.put("consumer01", new ArrayList<>(Arrays.asList(tp("topic01", 0))));
+        assignor.currentAssignment.put("consumer02", new ArrayList<>(Arrays.asList(tp("topic02", 0), tp("topic03", 0))));
+        assignor.currentAssignment.put("consumer03", new ArrayList<>(Arrays.asList(tp("topic04", 0), tp("topic05", 0), tp("topic06", 0))));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+    }
+
+    @Test
+    public void testStickiness() {
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put("topic01", 3);
+        Map<String, List<String>> subscriptions = new HashMap<>();
+        subscriptions.put("consumer01", topics("topic01"));
+        subscriptions.put("consumer02", topics("topic01"));
+        subscriptions.put("consumer03", topics("topic01"));
+        subscriptions.put("consumer04", topics("topic01"));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        Map<String, TopicPartition> partitionsAssigned = new HashMap<>();
+
+        Set<Entry<String, List<TopicPartition>>> assignments = assignment.entrySet();
+        for (Map.Entry<String, List<TopicPartition>> entry: assignments) {
+            String consumer = entry.getKey();
+            List<TopicPartition> topicPartitions = entry.getValue();
+            int size = topicPartitions.size();
+            assertTrue("Consumer " + consumer + " is assigned more topic partitions than expected.", size <= 1);
+            if (size == 1)
+                partitionsAssigned.put(consumer, topicPartitions.get(0));
+        }
+
+        // removing the potential group leader
+        subscriptions.remove("consumer01");
+
+        assignment = assignor.assign(partitionsPerTopic, subscriptions);
+        verifyValidityAndBalance(subscriptions, assignment);
+        assertTrue(assignor.isSticky());
+
+        assignments = assignment.entrySet();
+        for (Map.Entry<String, List<TopicPartition>> entry: assignments) {
+            String consumer = entry.getKey();
+            List<TopicPartition> topicPartitions = entry.getValue();
+            assertEquals("Consumer " + consumer + " is assigned more topic partitions than expected.", 1, topicPartitions.size());
+            assertTrue("Stickiness was not honored for consumer " + consumer,
+                (!partitionsAssigned.containsKey(consumer)) || (assignment.get(consumer).contains(partitionsAssigned.get(consumer))));
+        }
+    }
+
+    private String getTopicName(int i, int maxNum) {
+        return getCanonicalName("t", i, maxNum);
+    }
+
+    private String getConsumerName(int i, int maxNum) {
+        return getCanonicalName("c", i, maxNum);
+    }
+
+    private String getCanonicalName(String str, int i, int maxNum) {
+        return str + pad(i, Integer.toString(maxNum).length());
+    }
+
+    private String pad(int num, int digits) {
+        StringBuilder sb = new StringBuilder();
+        int iDigits = Integer.toString(num).length();
+
+        for (int i = 1; i <= digits - iDigits; ++i)
+            sb.append("0");
+
+        sb.append(num);
+        return sb.toString();
+    }
+
+    private static List<String> topics(String... topics) {
+        return Arrays.asList(topics);
+    }
+
+    private static TopicPartition tp(String topic, int partition) {
+        return new TopicPartition(topic, partition);
+    }
+
+    private static boolean isFullyBalanced(Map<String, List<TopicPartition>> assignment) {
+        int min = Integer.MAX_VALUE;
+        int max = Integer.MIN_VALUE;
+        for (List<TopicPartition> topicPartitions: assignment.values()) {
+            int size = topicPartitions.size();
+            if (size < min)
+                min = size;
+            if (size > max)
+                max = size;
+        }
+        return max - min <= 1;
+    }
+
+    private static List<String> getRandomSublist(ArrayList<String> list) {
+        List<String> selectedItems = new ArrayList<>(list);
+        int len = list.size();
+        Random random = new Random();
+        int howManyToRemove = random.nextInt(len);
+
+        for (int i = 1; i <= howManyToRemove; ++i)
+            selectedItems.remove(random.nextInt(selectedItems.size()));
+
+        return selectedItems;
+    }
+
+    /**
+     * Verifies that the given assignment is valid and balanced with respect to the given subscriptions
+     * Validity requirements:
+     * - each consumer is subscribed to topics of all partitions assigned to it, and
+     * - each partition is assigned to no more than one consumer
+     * Balance requirements:
+     * - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or
+     * - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions
+     *
+     * @param subscriptions: topic subscriptions of each consumer
+     * @param assignment: given assignment for balance check
+     */
+    private static void verifyValidityAndBalance(Map<String, List<String>> subscriptions, Map<String, List<TopicPartition>> assignments) {
+        int size = subscriptions.size();
+        assert size == assignments.size();
+
+        List<String> consumers = Utils.sorted(assignments.keySet());
+
+        for (int i = 0; i < size; ++i) {
+            String consumer = consumers.get(i);
+            List<TopicPartition> partitions = assignments.get(consumer);
+            for (TopicPartition partition: partitions)
+                assertTrue("Error: Partition " + partition + "is assigned to c" + i + ", but it is not subscribed to Topic t" + partition.topic()
+                        + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(),
+                        subscriptions.get(consumer).contains(partition.topic()));
+
+            if (i == size - 1)
+                continue;
+
+            for (int j = i + 1; j < size; ++j) {
+                String otherConsumer = consumers.get(j);
+                List<TopicPartition> otherPartitions = assignments.get(otherConsumer);
+
+                Set<TopicPartition> intersection = new HashSet<>(partitions);
+                intersection.retainAll(otherPartitions);
+                assertTrue("Error: Consumers c" + i + " and c" + j + " have common partitions assigned to them: " + intersection.toString()
+                        + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(),
+                        intersection.isEmpty());
+
+                int len = partitions.size();
+                int otherLen = otherPartitions.size();
+
+                if (Math.abs(len - otherLen) <= 1)
+                    continue;
+
+                Map<String, List<Integer>> map = CollectionUtils.groupDataByTopic(partitions);
+                Map<String, List<Integer>> otherMap = CollectionUtils.groupDataByTopic(otherPartitions);
+
+                if (len > otherLen) {
+                    for (String topic: map.keySet())
+                        assertTrue("Error: Some partitions can be moved from c" + i + " to c" + j + " to achieve a better balance"
+                                + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions."
+                                + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(),
+                                !otherMap.containsKey(topic));
+                }
+
+                if (otherLen > len) {
+                    for (String topic: otherMap.keySet())
+                        assertTrue("Error: Some partitions can be moved from c" + j + " to c" + i + " to achieve a better balance"
+                                + "\nc" + i + " has " + len + " partitions, and c" + j + " has " + otherLen + " partitions."
+                                + "\nSubscriptions: " + subscriptions.toString() + "\nAssignments: " + assignments.toString(),
+                                !map.containsKey(topic));
+                }
+            }
+        }
+    }
+}