You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/28 04:13:03 UTC

[kafka] branch trunk updated: KAFKA-8356: add static membership info to round robin assignor (#6815)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fbf6a76  KAFKA-8356: add static membership info to round robin assignor (#6815)
fbf6a76 is described below

commit fbf6a76fc40fc5fecd679ef6484a0b92a4ab3971
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Jun 27 21:12:39 2019 -0700

    KAFKA-8356: add static membership info to round robin assignor (#6815)
    
    The purpose here is to leverage static membership information during round robin consumer assignment, because persistent member id could help make the assignment remain the same during rebalance.
    The comparison logic is changed to:
    
    1. If member A and member B both have group.instance.id, then compare their group.instance.id
    2. If member A has group.instance.id, while member B doesn't, then A < B
    3. If both member A and B don't have group.instance.id, compare their member.id
    
    In round robin assignor, we use ephemeral member.id to sort the members in order for assignment. This semantic is not stable and could trigger unnecessary shuffle of tasks. By leveraging group.instance.id the static member assignment shall be persist when satisfying following conditions:
    
    1. number of members remain the same across generation
    2. static members' identities persist across generation
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/RoundRobinAssignor.java |  68 ++++--
 .../kafka/clients/consumer/StickyAssignor.java     |  13 +-
 .../internals/AbstractPartitionAssignor.java       |  49 ++++-
 .../kafka/clients/consumer/RangeAssignorTest.java  |  18 +-
 .../clients/consumer/RoundRobinAssignorTest.java   | 208 ++++++++++++++++--
 .../kafka/clients/consumer/StickyAssignorTest.java | 132 +++++-------
 .../internals/AbstractPartitionAssignorTest.java   |  89 ++++++++
 .../internals/StreamsPartitionAssignorTest.java    | 240 ++++++++++-----------
 8 files changed, 561 insertions(+), 256 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index 02cdc45..0981400 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -34,8 +34,9 @@ import java.util.TreeSet;
  * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
  * will be within a delta of exactly one across all consumers.)
  *
- * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and <code>t1</code>, and each topic has 3 partitions,
- * resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
+ * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and <code>t1</code>,
+ * and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t0p2</code>,
+ * <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
  *
  * <p>The assignment will be:
  * <ul>
@@ -46,9 +47,12 @@ import java.util.TreeSet;
  * <p>When subscriptions differ across consumer instances, the assignment process still considers each
  * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
  * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
- * assignments. For example, we have three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>, and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>,
- * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
- * <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
+ * assignments. For example, we have three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
+ * and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>, with 1, 2, and 3 partitions, respectively.
+ * Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>, <code>t2p1</code>, <code>t2p2</code>.
+ * <code>C0</code> is subscribed to <code>t0</code>;
+ * <code>C1</code> is subscribed to <code>t0</code>, <code>t1</code>;
+ * and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
  *
  * <p>That assignment will be:
  * <ul>
@@ -56,6 +60,41 @@ import java.util.TreeSet;
  * <li><code>C1: [t1p0]</code>
  * <li><code>C2: [t1p1, t2p0, t2p1, t2p2]</code>
  * </ul>
+ *
+ * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
+ * For example, we have three consumers with assigned <code>member.id</code> <code>C0</code>, <code>C1</code>, <code>C2</code>,
+ * two topics <code>t0</code> and <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>,
+ * <code>t0p1</code>, <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>. We choose to honor
+ * the sorted order based on ephemeral <code>member.id</code>.
+ *
+ * <p>The assignment will be:
+ * <ul>
+ * <li><code>C0: [t0p0, t1p0]</code>
+ * <li><code>C1: [t0p1, t1p1]</code>
+ * <li><code>C2: [t0p2, t1p2]</code>
+ * </ul>
+ *
+ * After one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
+ * for example <code>C0</code> -> <code>C5</code> <code>C1</code> -> <code>C3</code>, <code>C2</code> -> <code>C4</code>.
+ *
+ * <p>the assignment could be completely shuffled to:
+ * <ul>
+ * <li><code>C3 (was C1): [t0p0, t1p0] (before was [t0p1, t1p1])</code>
+ * <li><code>C4 (was C2): [t0p1, t1p1] (before was [t0p2, t1p2])</code>
+ * <li><code>C5 (was C0): [t0p2, t1p2] (before was [t0p0, t1p0])</code>
+ * </ul>
+ *
+ * This issue could be mitigated by the introduction of static membership. Consumers will have individual instance ids
+ * <code>I1</code>, <code>I2</code>, <code>I3</code>. As long as
+ * 1. Number of members remain the same across generation
+ * 2. Static members' identities persist across generation
+ *
+ * <p>The assignment will always be:
+ * <ul>
+ * <li><code>I0: [t0p0, t1p0]</code>
+ * <li><code>I1: [t0p1, t1p1]</code>
+ * <li><code>I2: [t0p2, t1p2]</code>
+ * </ul>
  */
 public class RoundRobinAssignor extends AbstractPartitionAssignor {
 
@@ -63,21 +102,26 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
-        for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<>());
+        List<MemberInfo> memberInfoList = new ArrayList<>();
+        for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) {
+            assignment.put(memberSubscription.getKey(), new ArrayList<>());
+            memberInfoList.add(new MemberInfo(memberSubscription.getKey(),
+                                              memberSubscription.getValue().groupInstanceId()));
+        }
+
+        CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));
 
-        CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
         for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
             final String topic = partition.topic();
-            while (!subscriptions.get(assigner.peek()).topics().contains(topic))
+            while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
                 assigner.next();
-            assignment.get(assigner.next()).add(partition);
+            assignment.get(assigner.next().memberId).add(partition);
         }
         return assignment;
     }
 
-    public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
-                                                    Map<String, Subscription> subscriptions) {
+    private List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
+                                                     Map<String, Subscription> subscriptions) {
         SortedSet<String> topics = new TreeSet<>();
         for (Subscription subscription : subscriptions.values())
             topics.addAll(subscription.topics());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 9575ba6..3c7d010 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -223,6 +223,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
         }
     }
 
+    @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
         Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
@@ -244,19 +245,19 @@ public class StickyAssignor extends AbstractPartitionAssignor {
         }
 
         for (Entry<String, Subscription> entry: subscriptions.entrySet()) {
-            String consumer = entry.getKey();
-            consumer2AllPotentialPartitions.put(consumer, new ArrayList<>());
+            String consumerId = entry.getKey();
+            consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
             entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> {
                 for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
                     TopicPartition topicPartition = new TopicPartition(topic, i);
-                    consumer2AllPotentialPartitions.get(consumer).add(topicPartition);
-                    partition2AllPotentialConsumers.get(topicPartition).add(consumer);
+                    consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
+                    partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
                 }
             });
 
             // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
-            if (!currentAssignment.containsKey(consumer))
-                currentAssignment.put(consumer, new ArrayList<>());
+            if (!currentAssignment.containsKey(consumerId))
+                currentAssignment.put(consumerId, new ArrayList<>());
         }
 
         // a mapping of partition to current consumer
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 35eb8eb..2487daa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -39,7 +40,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
      * Perform the group assignment given the partition counts and member subscriptions
      * @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded
      *                           from this map.
-     * @param subscriptions Map from the memberId to their respective topic subscription
+     * @param subscriptions Map from the member id to their respective topic subscription
      * @return Map from each member to the list of partitions assigned to them.
      */
     public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
@@ -90,4 +91,50 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
             partitions.add(new TopicPartition(topic, i));
         return partitions;
     }
+
+    public static class MemberInfo implements Comparable<MemberInfo> {
+        public final String memberId;
+        public final Optional<String> groupInstanceId;
+
+        public MemberInfo(String memberId, Optional<String> groupInstanceId) {
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+        }
+
+        @Override
+        public int compareTo(MemberInfo otherMemberInfo) {
+            if (this.groupInstanceId.isPresent() &&
+                    otherMemberInfo.groupInstanceId.isPresent()) {
+                return this.groupInstanceId.get()
+                        .compareTo(otherMemberInfo.groupInstanceId.get());
+            } else if (this.groupInstanceId.isPresent()) {
+                return -1;
+            } else if (otherMemberInfo.groupInstanceId.isPresent()) {
+                return 1;
+            } else {
+                return this.memberId.compareTo(otherMemberInfo.memberId);
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof MemberInfo && this.memberId.equals(((MemberInfo) o).memberId);
+        }
+
+        /**
+         * We could just use member.id to be the hashcode, since it's unique
+         * across the group.
+         */
+        @Override
+        public int hashCode() {
+            return memberId.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return "MemberInfo [member.id: " + memberId
+                    + ", group.instance.id: " + groupInstanceId.orElse("{}")
+                    + "]";
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index 8158f54..f08ca14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -33,16 +33,15 @@ import static org.junit.Assert.assertTrue;
 public class RangeAssignorTest {
 
     private RangeAssignor assignor = new RangeAssignor();
-
+    private String consumerId = "consumer";
+    private String topic = "topic";
 
     @Test
     public void testOneConsumerNoTopic() {
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, new Subscription(Collections.<String>emptyList())));
+                Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())));
 
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
@@ -50,9 +49,6 @@ public class RangeAssignorTest {
 
     @Test
     public void testOneConsumerNonexistentTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
                 Collections.singletonMap(consumerId, new Subscription(topics(topic))));
@@ -62,9 +58,6 @@ public class RangeAssignorTest {
 
     @Test
     public void testOneConsumerOneTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
 
@@ -77,9 +70,7 @@ public class RangeAssignorTest {
 
     @Test
     public void testOnlyAssignsPartitionsFromSubscribedTopics() {
-        String topic = "topic";
         String otherTopic = "other";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
@@ -95,7 +86,6 @@ public class RangeAssignorTest {
     public void testOneConsumerMultipleTopics() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic1, 1);
@@ -123,7 +113,7 @@ public class RangeAssignorTest {
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
         assertAssignment(partitions(tp(topic, 0)), assignment.get(consumer1));
-        assertAssignment(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
+        assertAssignment(Collections.emptyList(), assignment.get(consumer2));
     }
 
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index 799a58a..fa68406 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -16,15 +16,18 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -32,25 +35,21 @@ import static org.junit.Assert.assertTrue;
 public class RoundRobinAssignorTest {
 
     private RoundRobinAssignor assignor = new RoundRobinAssignor();
-
+    private String topic = "topic";
+    private String consumerId = "consumer";
 
     @Test
     public void testOneConsumerNoTopic() {
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
-                Collections.singletonMap(consumerId, new Subscription(Collections.<String>emptyList())));
+                Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())));
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
         assertTrue(assignment.get(consumerId).isEmpty());
     }
 
     @Test
     public void testOneConsumerNonexistentTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
                 Collections.singletonMap(consumerId, new Subscription(topics(topic))));
@@ -61,9 +60,6 @@ public class RoundRobinAssignorTest {
 
     @Test
     public void testOneConsumerOneTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
 
@@ -74,9 +70,7 @@ public class RoundRobinAssignorTest {
 
     @Test
     public void testOnlyAssignsPartitionsFromSubscribedTopics() {
-        String topic = "topic";
         String otherTopic = "other";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
@@ -91,7 +85,6 @@ public class RoundRobinAssignorTest {
     public void testOneConsumerMultipleTopics() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic1, 1);
@@ -104,7 +97,6 @@ public class RoundRobinAssignorTest {
 
     @Test
     public void testTwoConsumersOneTopicOnePartition() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
 
@@ -122,7 +114,6 @@ public class RoundRobinAssignorTest {
 
     @Test
     public void testTwoConsumersOneTopicTwoPartitions() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
 
@@ -162,7 +153,7 @@ public class RoundRobinAssignorTest {
     }
 
     @Test
-    public void testTwoConsumersTwoTopicsSixPartitions() {
+    public void testTwoDynamicConsumersTwoTopicsSixPartitions() {
         String topic1 = "topic1";
         String topic2 = "topic2";
         String consumer1 = "consumer1";
@@ -181,6 +172,191 @@ public class RoundRobinAssignorTest {
         assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
     }
 
+    @Test
+    public void testTwoStaticConsumersTwoTopicsSixPartitions() {
+        // although consumer 2 has a higher rank than 1, the comparison happens on
+        // instance id level.
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer-b";
+        String instance1 = "instance1";
+        String consumer2 = "consumer-a";
+        String instance2 = "instance2";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        Map<String, Subscription> consumers = new HashMap<>();
+        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2),
+                                                              null,
+                                                              Collections.emptyList());
+        consumer1Subscription.setGroupInstanceId(Optional.of(instance1));
+        consumers.put(consumer1, consumer1Subscription);
+        Subscription consumer2Subscription = new Subscription(topics(topic1, topic2),
+                                                              null,
+                                                              Collections.emptyList());
+        consumer2Subscription.setGroupInstanceId(Optional.of(instance2));
+        consumers.put(consumer2, consumer2Subscription);
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
+    }
+
+    @Test
+    public void testOneStaticConsumerAndOneDynamicConsumerTwoTopicsSixPartitions() {
+        // although consumer 2 has a higher rank than 1, consumer 1 will win the comparison
+        // because it has instance id while consumer 2 doesn't.
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer-b";
+        String instance1 = "instance1";
+        String consumer2 = "consumer-a";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+
+        Map<String, Subscription> consumers = new HashMap<>();
+
+        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2),
+                                                              null,
+                                                              Collections.emptyList());
+        consumer1Subscription.setGroupInstanceId(Optional.of(instance1));
+        consumers.put(consumer1, consumer1Subscription);
+        consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), assignment.get(consumer1));
+        assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), assignment.get(consumer2));
+    }
+
+    @Test
+    public void testStaticMemberAssignmentPersistent() {
+        // Have 3 static members instance1, instance2, instance3 to be persistent
+        // across generations. Their assignment shall be the same.
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String instance1 = "instance1";
+        String consumer2 = "consumer2";
+        String instance2 = "instance2";
+        String consumer3 = "consumer3";
+        String instance3 = "instance3";
+
+        List<MemberInfo> staticMemberInfos = new ArrayList<>();
+        staticMemberInfos.add(new MemberInfo(consumer1, Optional.of(instance1)));
+        staticMemberInfos.add(new MemberInfo(consumer2, Optional.of(instance2)));
+        staticMemberInfos.add(new MemberInfo(consumer3, Optional.of(instance3)));
+
+        // Consumer 4 is a dynamic member.
+        String consumer4 = "consumer4";
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+        Map<String, Subscription> consumers = new HashMap<>();
+        for (MemberInfo m : staticMemberInfos) {
+            Subscription subscription = new Subscription(topics(topic1, topic2),
+                                                         null,
+                                                         Collections.emptyList());
+            subscription.setGroupInstanceId(m.groupInstanceId);
+            consumers.put(m.memberId, subscription);
+        }
+        consumers.put(consumer4, new Subscription(topics(topic1, topic2)));
+
+        Map<String, List<TopicPartition>> expectedAssignment = new HashMap<>();
+        expectedAssignment.put(consumer1, partitions(tp(topic1, 0), tp(topic2, 1)));
+        expectedAssignment.put(consumer2, partitions(tp(topic1, 1), tp(topic2, 2)));
+        expectedAssignment.put(consumer3, partitions(tp(topic1, 2)));
+        expectedAssignment.put(consumer4, partitions(tp(topic2, 0)));
+
+        Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(expectedAssignment, assignment);
+
+        // Replace dynamic member 4 with a new dynamic member 5.
+        consumers.remove(consumer4);
+        String consumer5 = "consumer5";
+        consumers.put(consumer5, new Subscription(topics(topic1, topic2)));
+
+        expectedAssignment.remove(consumer4);
+        expectedAssignment.put(consumer5, partitions(tp(topic2, 0)));
+        assignment = assignor.assign(partitionsPerTopic, consumers);
+        assertEquals(expectedAssignment, assignment);
+    }
+
+    @Test
+    public void testStaticMemberAssignmentPersistentAfterMemberIdChanges() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        String consumer1 = "consumer1";
+        String instance1 = "instance1";
+        String consumer2 = "consumer2";
+        String instance2 = "instance2";
+        String consumer3 = "consumer3";
+        String instance3 = "instance3";
+        Map<String, String> memberIdToInstanceId = new HashMap<>();
+        memberIdToInstanceId.put(consumer1, instance1);
+        memberIdToInstanceId.put(consumer2, instance2);
+        memberIdToInstanceId.put(consumer3, instance3);
+
+        List<String> memberIdList = Arrays.asList(consumer1, consumer2, consumer3);
+        Map<String, List<TopicPartition>> staticAssignment =
+            checkStaticAssignment(topic1, topic2, memberIdList, memberIdToInstanceId);
+        memberIdToInstanceId.clear();
+
+        // Now switch the member.id fields for each member info, the assignment should
+        // stay the same as last time.
+        String consumer4 = "consumer4";
+        String consumer5 = "consumer5";
+        memberIdToInstanceId.put(consumer4, instance1);
+        memberIdToInstanceId.put(consumer5, instance2);
+        memberIdToInstanceId.put(consumer1, instance3);
+        memberIdList = Arrays.asList(consumer4, consumer5, consumer1);
+        Map<String, List<TopicPartition>> newStaticAssignment =
+            checkStaticAssignment(topic1, topic2, memberIdList, memberIdToInstanceId);
+
+        assertEquals(staticAssignment, newStaticAssignment);
+    }
+
+    private Map<String, List<TopicPartition>> checkStaticAssignment(String topic1,
+                                                                    String topic2,
+                                                                    List<String> memberIdList,
+                                                                    Map<String, String> memberIdToInstanceId) {
+        List<MemberInfo> staticMemberInfos = new ArrayList<>();
+        for (Map.Entry<String, String> entry : memberIdToInstanceId.entrySet()) {
+            staticMemberInfos.add(new MemberInfo(entry.getKey(), Optional.of(entry.getValue())));
+        }
+
+        Map<String, Integer> partitionsPerTopic = new HashMap<>();
+        partitionsPerTopic.put(topic1, 3);
+        partitionsPerTopic.put(topic2, 3);
+        Map<String, Subscription> consumers = new HashMap<>();
+        for (MemberInfo m : staticMemberInfos) {
+            Subscription subscription = new Subscription(topics(topic1, topic2),
+                                                         null,
+                                                         Collections.emptyList());
+            subscription.setGroupInstanceId(m.groupInstanceId);
+            consumers.put(m.memberId, subscription);
+        }
+
+        Map<String, List<TopicPartition>> expectedInstanceAssignment = new HashMap<>();
+        for (int i = 0; i < memberIdList.size(); i++) {
+            expectedInstanceAssignment.put(memberIdToInstanceId.get(memberIdList.get(i)),
+                                           partitions(tp(topic1, i), tp(topic2, i)));
+        }
+
+        Map<String, List<TopicPartition>> assignmentByMemberId =
+            assignor.assign(partitionsPerTopic, consumers);
+        Map<String, List<TopicPartition>> assignmentByInstanceId = new HashMap<>();
+        for (String memberId : memberIdList) {
+            assignmentByInstanceId.put(memberIdToInstanceId.get(memberId),
+                                       assignmentByMemberId.get(memberId));
+        }
+        assertEquals(expectedInstanceAssignment, assignmentByInstanceId);
+        return assignmentByInstanceId;
+    }
+
     private static List<String> topics(String... topics) {
         return Arrays.asList(topics);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index a1fe0cd..89f0d37 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,19 +35,33 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
+import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class StickyAssignorTest {
 
     private StickyAssignor assignor = new StickyAssignor();
+    private String consumerId = "consumer";
+    private Map<String, Subscription> subscriptions;
+    private String topic = "topic";
+
+    @Before
+    public void setUp() {
+        if (subscriptions != null) {
+            subscriptions.clear();
+        } else {
+            subscriptions = new HashMap<>();
+        }
+    }
 
     @Test
     public void testOneConsumerNoTopic() {
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        Map<String, Subscription> subscriptions =
-                Collections.singletonMap(consumerId, new Subscription(Collections.emptyList()));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(Collections.emptyList()));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(Collections.singleton(consumerId), assignment.keySet());
@@ -62,12 +73,9 @@ public class StickyAssignorTest {
 
     @Test
     public void testOneConsumerNonexistentTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 0);
-        Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
 
@@ -80,12 +88,9 @@ public class StickyAssignorTest {
 
     @Test
     public void testOneConsumerOneTopic() {
-        String topic = "topic";
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId));
@@ -96,14 +101,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testOnlyAssignsPartitionsFromSubscribedTopics() {
-        String topic = "topic";
         String otherTopic = "other";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
         partitionsPerTopic.put(otherTopic, 3);
-        Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(partitions(tp(topic, 0), tp(topic, 1), tp(topic, 2)), assignment.get(consumerId));
@@ -116,12 +119,11 @@ public class StickyAssignorTest {
     public void testOneConsumerMultipleTopics() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic1, 1);
         partitionsPerTopic.put(topic2, 2);
-        Map<String, Subscription> subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2)));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic1, topic2)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(partitions(tp(topic1, 0), tp(topic2, 0), tp(topic2, 1)), assignment.get(consumerId));
@@ -132,14 +134,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testTwoConsumersOneTopicOnePartition() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 1);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
 
@@ -153,14 +153,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testTwoConsumersOneTopicTwoPartitions() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 2);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
 
@@ -184,7 +182,6 @@ public class StickyAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 2);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic1)));
         subscriptions.put(consumer2, new Subscription(topics(topic1, topic2)));
         subscriptions.put(consumer3, new Subscription(topics(topic1)));
@@ -209,7 +206,6 @@ public class StickyAssignorTest {
         partitionsPerTopic.put(topic1, 3);
         partitionsPerTopic.put(topic2, 3);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic1, topic2)));
         subscriptions.put(consumer2, new Subscription(topics(topic1, topic2)));
 
@@ -223,12 +219,10 @@ public class StickyAssignorTest {
 
     @Test
     public void testAddRemoveConsumerOneTopic() {
-        String topic = "topic";
-        String consumer1 = "consumer";
+        String consumer1 = "consumer1";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
@@ -255,9 +249,8 @@ public class StickyAssignorTest {
                 new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(
                         new ConsumerUserData(assignment.get(consumer2), Optional.of(assignor.generation())))));
         assignment = assignor.assign(partitionsPerTopic, subscriptions);
-        assertTrue(assignment.get(consumer2).contains(tp(topic, 0)));
-        assertTrue(assignment.get(consumer2).contains(tp(topic, 1)));
-        assertTrue(assignment.get(consumer2).contains(tp(topic, 2)));
+        assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic, 1), tp(topic, 0))),
+                new HashSet<>(assignment.get(consumer2)));
 
         verifyValidityAndBalance(subscriptions, assignment);
         assertTrue(isFullyBalanced(assignment));
@@ -289,11 +282,14 @@ public class StickyAssignorTest {
         for (int i = 1; i <= 5; i++)
             partitionsPerTopic.put(String.format("topic%d", i), (i % 2) + 1);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer1", new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5")));
-        subscriptions.put("consumer2", new Subscription(topics("topic1", "topic3", "topic5")));
-        subscriptions.put("consumer3", new Subscription(topics("topic1", "topic3", "topic5")));
-        subscriptions.put("consumer4", new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5")));
+        subscriptions.put("consumer1",
+                new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5")));
+        subscriptions.put("consumer2",
+                new Subscription(topics("topic1", "topic3", "topic5")));
+        subscriptions.put("consumer3",
+                new Subscription(topics("topic1", "topic3", "topic5")));
+        subscriptions.put("consumer4",
+                new Subscription(topics("topic1", "topic2", "topic3", "topic4", "topic5")));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -301,13 +297,11 @@ public class StickyAssignorTest {
 
     @Test
     public void testAddRemoveTopicTwoConsumers() {
-        String topic = "topic";
         String consumer1 = "consumer";
         String consumer2 = "consumer2";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
 
@@ -368,7 +362,6 @@ public class StickyAssignorTest {
         for (int i = 1; i < 20; i++)
             partitionsPerTopic.put(getTopicName(i, 20), i);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 20; i++) {
             List<String> topics = new ArrayList<>();
             for (int j = 1; j <= i; j++)
@@ -397,9 +390,9 @@ public class StickyAssignorTest {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put("topic", 20);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 10; i++)
-            subscriptions.put(getConsumerName(i, 10), new Subscription(topics("topic")));
+            subscriptions.put(getConsumerName(i, 10),
+                    new Subscription(topics("topic")));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -418,7 +411,6 @@ public class StickyAssignorTest {
         for (int i = 1; i < 15; i++)
             partitionsPerTopic.put(getTopicName(i, 15), i);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 1; i < 9; i++) {
             List<String> topics = new ArrayList<>();
             for (int j = 1; j <= partitionsPerTopic.size(); j++)
@@ -452,7 +444,6 @@ public class StickyAssignorTest {
         for (int i = 0; i < topicCount; i++)
             partitionsPerTopic.put(getTopicName(i, topicCount), rand.nextInt(10) + 1);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 0; i < consumerCount; i++) {
             List<String> topics = new ArrayList<>();
             for (int j = 0; j < rand.nextInt(20); j++)
@@ -485,7 +476,6 @@ public class StickyAssignorTest {
         for (int i = 1; i < 5; i++)
             partitionsPerTopic.put(getTopicName(i, 5), 1);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         for (int i = 0; i < 3; i++) {
             List<String> topics = new ArrayList<>();
             for (int j = i; j <= 3 * i - 2; j++)
@@ -523,7 +513,6 @@ public class StickyAssignorTest {
 
             int numConsumers = minNumConsumers + new Random().nextInt(maxNumConsumers - minNumConsumers);
 
-            Map<String, Subscription> subscriptions = new HashMap<>();
             for (int i = 0; i < numConsumers; ++i) {
                 List<String> sub = Utils.sorted(getRandomSublist(topics));
                 subscriptions.put(getConsumerName(i, maxNumConsumers), new Subscription(sub));
@@ -554,7 +543,6 @@ public class StickyAssignorTest {
         for (int i = 1; i <= 6; i++)
             partitionsPerTopic.put(String.format("topic%02d", i), 1);
 
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer01",
                 new Subscription(topics("topic01", "topic02"),
                         StickyAssignor.serializeTopicPartitionAssignment(
@@ -576,11 +564,15 @@ public class StickyAssignorTest {
     public void testStickiness() {
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put("topic01", 3);
-        Map<String, Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer01", new Subscription(topics("topic01")));
-        subscriptions.put("consumer02", new Subscription(topics("topic01")));
-        subscriptions.put("consumer03", new Subscription(topics("topic01")));
-        subscriptions.put("consumer04", new Subscription(topics("topic01")));
+        String consumer1 = "consumer01";
+        String consumer2 = "consumer02";
+        String consumer3 = "consumer03";
+        String consumer4 = "consumer04";
+
+        subscriptions.put(consumer1, new Subscription(topics("topic01")));
+        subscriptions.put(consumer2, new Subscription(topics("topic01")));
+        subscriptions.put(consumer3, new Subscription(topics("topic01")));
+        subscriptions.put(consumer4, new Subscription(topics("topic01")));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         verifyValidityAndBalance(subscriptions, assignment);
@@ -597,16 +589,16 @@ public class StickyAssignorTest {
         }
 
         // removing the potential group leader
-        subscriptions.remove("consumer01");
-        subscriptions.put("consumer02",
+        subscriptions.remove(consumer1);
+        subscriptions.put(consumer2,
                 new Subscription(topics("topic01"),
                         StickyAssignor.serializeTopicPartitionAssignment(
                                 new ConsumerUserData(assignment.get("consumer02"), Optional.of(assignor.generation())))));
-        subscriptions.put("consumer03",
+        subscriptions.put(consumer3,
                 new Subscription(topics("topic01"),
                         StickyAssignor.serializeTopicPartitionAssignment(
                                 new ConsumerUserData(assignment.get("consumer03"), Optional.of(assignor.generation())))));
-        subscriptions.put("consumer04",
+        subscriptions.put(consumer4,
                 new Subscription(topics("topic01"),
                         StickyAssignor.serializeTopicPartitionAssignment(
                                 new ConsumerUserData(assignment.get("consumer04"), Optional.of(assignor.generation())))));
@@ -627,13 +619,10 @@ public class StickyAssignorTest {
 
     @Test
     public void testAssignmentUpdatedForDeletedTopic() {
-        String consumerId = "consumer";
-
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put("topic01", 1);
         partitionsPerTopic.put("topic03", 100);
-        Map<String, Subscription> subscriptions =
-                Collections.singletonMap(consumerId, new Subscription(topics("topic01", "topic02", "topic03")));
+        subscriptions = Collections.singletonMap(consumerId, new Subscription(topics("topic01", "topic02", "topic03")));
 
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
         assertEquals(assignment.values().stream().mapToInt(topicPartitions -> topicPartitions.size()).sum(), 1 + 100);
@@ -643,32 +632,27 @@ public class StickyAssignorTest {
 
     @Test
     public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
-        String topic = "topic01";
-        String consumer = "consumer01";
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, Subscription> subscriptions = new HashMap<>();
-        subscriptions.put(consumer, new Subscription(topics(topic)));
+        subscriptions.put(consumerId, new Subscription(topics(topic)));
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, subscriptions);
-        subscriptions.put(consumer,
+        subscriptions.put(consumerId,
                 new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(
-                        new ConsumerUserData(assignment.get(consumer), Optional.of(1)))));
+                        new ConsumerUserData(assignment.get(consumerId), Optional.of(1)))));
 
         assignment = assignor.assign(Collections.emptyMap(), subscriptions);
         assertEquals(assignment.size(), 1);
-        assertTrue(assignment.get(consumer).isEmpty());
+        assertTrue(assignment.get(consumerId).isEmpty());
     }
 
     @Test
     public void testAssignmentWithMultipleGenerations1() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
         String consumer3 = "consumer3";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 6);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
         subscriptions.put(consumer3, new Subscription(topics(topic)));
@@ -698,7 +682,7 @@ public class StickyAssignorTest {
         assertTrue(isFullyBalanced(assignment));
         assertTrue(assignor.isSticky());
 
-        assertTrue(!Collections.disjoint(r2partitions2, r1partitions3));
+        assertFalse(Collections.disjoint(r2partitions2, r1partitions3));
         subscriptions.remove(consumer1);
         subscriptions.put(consumer2,
                 new Subscription(topics(topic), StickyAssignor.serializeTopicPartitionAssignment(
@@ -718,14 +702,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testAssignmentWithMultipleGenerations2() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
         String consumer3 = "consumer3";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 6);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
         subscriptions.put(consumer3, new Subscription(topics(topic)));
@@ -775,14 +757,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testAssignmentWithConflictingPreviousGenerations() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
         String consumer3 = "consumer3";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 6);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
         subscriptions.put(consumer3, new Subscription(topics(topic)));
@@ -822,14 +802,12 @@ public class StickyAssignorTest {
 
     @Test
     public void testSchemaBackwardCompatibility() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
         String consumer3 = "consumer3";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 3);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
         subscriptions.put(consumer3, new Subscription(topics(topic)));
@@ -860,13 +838,11 @@ public class StickyAssignorTest {
 
     @Test
     public void testConflictingPreviousAssignments() {
-        String topic = "topic";
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         partitionsPerTopic.put(topic, 2);
-        Map<String, Subscription> subscriptions = new HashMap<>();
         subscriptions.put(consumer1, new Subscription(topics(topic)));
         subscriptions.put(consumer2, new Subscription(topics(topic)));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
new file mode 100644
index 0000000..e5f7329
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class AbstractPartitionAssignorTest {
+
+    @Test
+    public void testMemberInfoSortingWithoutGroupInstanceId() {
+        MemberInfo m1 = new MemberInfo("a", Optional.empty());
+        MemberInfo m2 = new MemberInfo("b", Optional.empty());
+        MemberInfo m3 = new MemberInfo("c", Optional.empty());
+
+        List<MemberInfo> memberInfoList = Arrays.asList(m1, m2, m3);
+        assertEquals(memberInfoList, Utils.sorted(memberInfoList));
+    }
+
+    @Test
+    public void testMemberInfoSortingWithAllGroupInstanceId() {
+        MemberInfo m1 = new MemberInfo("a", Optional.of("y"));
+        MemberInfo m2 = new MemberInfo("b", Optional.of("z"));
+        MemberInfo m3 = new MemberInfo("c", Optional.of("x"));
+
+        List<MemberInfo> memberInfoList = Arrays.asList(m1, m2, m3);
+        assertEquals(Arrays.asList(m3, m1, m2), Utils.sorted(memberInfoList));
+    }
+
+    @Test
+    public void testMemberInfoSortingSomeGroupInstanceId() {
+        MemberInfo m1 = new MemberInfo("a", Optional.empty());
+        MemberInfo m2 = new MemberInfo("b", Optional.of("y"));
+        MemberInfo m3 = new MemberInfo("c", Optional.of("x"));
+
+        List<MemberInfo> memberInfoList = Arrays.asList(m1, m2, m3);
+        assertEquals(Arrays.asList(m3, m2, m1), Utils.sorted(memberInfoList));
+    }
+
+    @Test
+    public void testMergeSortManyMemberInfo() {
+        Random rand = new Random();
+        int bound = 2;
+        List<MemberInfo> memberInfoList = new ArrayList<>();
+        List<MemberInfo> staticMemberList = new ArrayList<>();
+        List<MemberInfo> dynamicMemberList = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            // Need to make sure all the ids are defined as 3-digits otherwise
+            // the comparison result will break.
+            String id = Integer.toString(i + 100);
+            Optional<String> groupInstanceId = rand.nextInt(bound) < bound / 2 ?
+                                                       Optional.of(id) : Optional.empty();
+            MemberInfo m = new MemberInfo(id, groupInstanceId);
+            memberInfoList.add(m);
+            if (m.groupInstanceId.isPresent()) {
+                staticMemberList.add(m);
+            } else {
+                dynamicMemberList.add(m);
+            }
+        }
+        staticMemberList.addAll(dynamicMemberList);
+        Collections.shuffle(memberInfoList);
+        assertEquals(staticMemberList, Utils.sorted(memberInfoList));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 284e396..5fa6653 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -97,6 +98,8 @@ public class StreamsPartitionAssignorTest {
             new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
     );
 
+    private final Set<TaskId> emptyTasks = Collections.emptySet();
+
     private final Cluster metadata = new Cluster(
         "cluster",
         Collections.singletonList(Node.noNode()),
@@ -143,6 +146,17 @@ public class StreamsPartitionAssignorTest {
         EasyMock.replay(taskManager);
     }
 
+    private Map<String, PartitionAssignor.Subscription> subscriptions;
+
+    @Before
+    public void setUp() {
+        if (subscriptions != null) {
+            subscriptions.clear();
+        } else {
+            subscriptions = new HashMap<>();
+        }
+    }
+
     @Test
     public void shouldInterleaveTasksByGroupId() {
         final TaskId taskIdA0 = new TaskId(0, 0);
@@ -221,14 +235,15 @@ public class StreamsPartitionAssignorTest {
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
-
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -304,11 +319,12 @@ public class StreamsPartitionAssignorTest {
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
+                          new PartitionAssignor.Subscription(topics,
+                                  new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
+                          new PartitionAssignor.Subscription(topics,
+                                  new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
 
@@ -342,16 +358,16 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        mockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        subscriptions.put("consumer10",
-            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
-
 
         // will throw exception if it fails
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()
+        ));
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assignment info
@@ -382,9 +398,10 @@ public class StreamsPartitionAssignorTest {
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
         configurePartitionAssignor(Collections.emptyMap());
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()
+                ));
 
         // initially metadata is empty
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);
@@ -398,7 +415,6 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
-        assertEquals(Collections.emptySet(), new HashSet<>(allActiveTasks));
 
         // then metadata gets populated
         assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -433,18 +449,20 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
         final UUID uuid2 = UUID.randomUUID();
-        mockTaskManager(prevTasks10, Collections.emptySet(), uuid1, builder);
+        mockTaskManager(prevTasks10, emptyTasks, uuid1, builder);
         configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -494,21 +512,23 @@ public class StreamsPartitionAssignorTest {
         final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(
-            Collections.emptySet(),
-            Collections.emptySet(),
+            emptyTasks,
+            emptyTasks,
             uuid1,
             builder);
         configurePartitionAssignor(Collections.emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -588,13 +608,15 @@ public class StreamsPartitionAssignorTest {
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -676,16 +698,15 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final UUID uuid1 = UUID.randomUUID();
-        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        mockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
         configurePartitionAssignor(Collections.emptyMap());
         final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
-
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
+        );
         partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
@@ -710,17 +731,16 @@ public class StreamsPartitionAssignorTest {
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final UUID uuid1 = UUID.randomUUID();
-        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        mockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
 
         configurePartitionAssignor(Collections.emptyMap());
         final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
-
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
+        );
         partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
@@ -758,8 +778,8 @@ public class StreamsPartitionAssignorTest {
         internalTopologyBuilder.setApplicationId(applicationId);
 
         mockTaskManager(
-            Collections.emptySet(),
-            Collections.emptySet(),
+            emptyTasks,
+            emptyTasks,
             UUID.randomUUID(),
             internalTopologyBuilder);
         configurePartitionAssignor(Collections.emptyMap());
@@ -769,16 +789,11 @@ public class StreamsPartitionAssignorTest {
             mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-        subscriptions.put(
-            client,
-            new PartitionAssignor.Subscription(
-                asList("topic1", "topic3"),
-                new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
-            )
+        subscriptions.put(client,
+                new PartitionAssignor.Subscription(
+                        asList("topic1", "topic3"),
+                        new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
 
         final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
@@ -821,8 +836,8 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
         mockTaskManager(
-            Collections.emptySet(),
-            Collections.emptySet(),
+            emptyTasks,
+            emptyTasks,
             uuid1,
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
@@ -842,16 +857,15 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid1 = UUID.randomUUID();
 
-        mockTaskManager(Collections.emptySet(), Collections.emptySet(), uuid1, builder);
+        mockTaskManager(emptyTasks, emptyTasks, uuid1, builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
-
+                new PartitionAssignor.Subscription(topics,
+                        new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
+        );
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
@@ -868,7 +882,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         builder.setApplicationId(applicationId);
 
-        mockTaskManager(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), builder);
+        mockTaskManager(emptyTasks, emptyTasks, UUID.randomUUID(), builder);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         try {
@@ -935,8 +949,8 @@ public class StreamsPartitionAssignorTest {
         internalTopologyBuilder.setApplicationId(applicationId);
 
         mockTaskManager(
-            Collections.emptySet(),
-            Collections.emptySet(),
+            emptyTasks,
+            emptyTasks,
             UUID.randomUUID(),
             internalTopologyBuilder);
         configurePartitionAssignor(Collections.emptyMap());
@@ -946,16 +960,11 @@ public class StreamsPartitionAssignorTest {
             mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-        subscriptions.put(
-            client,
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("unknownTopic"),
-                new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
-            )
+        subscriptions.put(client,
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("unknownTopic"),
+                        new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
 
         assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true));
@@ -992,8 +1001,8 @@ public class StreamsPartitionAssignorTest {
 
         final UUID uuid = UUID.randomUUID();
         mockTaskManager(
-            Collections.emptySet(),
-            Collections.emptySet(),
+            emptyTasks,
+            emptyTasks,
             uuid,
             internalTopologyBuilder);
 
@@ -1005,22 +1014,15 @@ public class StreamsPartitionAssignorTest {
             streamsConfig,
             mockClientSupplier.restoreConsumer));
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-        subscriptions.put(
-                "consumer1",
+        subscriptions.put("consumer1",
                 new PartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
-                        new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
-                )
+                        new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-
-        subscriptions.put(
-                "consumer2",
+        subscriptions.put("consumer2",
                 new PartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
-                        new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()
-                )
+                        new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode())
         );
         final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
         final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
@@ -1106,21 +1108,16 @@ public class StreamsPartitionAssignorTest {
 
     private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion,
                                                                                      final int otherVersion) {
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-        subscriptions.put(
-            "consumer1",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
-            )
+        subscriptions.put("consumer1",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
         );
-        subscriptions.put(
-            "consumer2",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
-            )
+        subscriptions.put("consumer2",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+                )
         );
 
         mockTaskManager(
@@ -1138,8 +1135,6 @@ public class StreamsPartitionAssignorTest {
 
     @Test
     public void shouldDownGradeSubscriptionToVersion1() {
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-
         mockTaskManager(
             emptyTasks,
             emptyTasks,
@@ -1178,8 +1173,6 @@ public class StreamsPartitionAssignorTest {
     }
 
     private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) {
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-
         mockTaskManager(
             emptyTasks,
             emptyTasks,
@@ -1196,7 +1189,6 @@ public class StreamsPartitionAssignorTest {
     public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances() {
         builder.addSource(null, "source1", null, null, null, "topic1");
 
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final Set<TaskId> activeTasks = Utils.mkSet(task0, task1);
@@ -1207,19 +1199,15 @@ public class StreamsPartitionAssignorTest {
             }
         };
 
-        subscriptions.put(
-            "consumer1",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode()
-            )
+        subscriptions.put("consumer1",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode())
         );
-        subscriptions.put(
-            "future-consumer",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                encodeFutureSubscription()
-            )
+        subscriptions.put("future-consumer",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        encodeFutureSubscription())
         );
 
         mockTaskManager(
@@ -1263,21 +1251,15 @@ public class StreamsPartitionAssignorTest {
     }
 
     private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
-        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        final Set<TaskId> emptyTasks = Collections.emptySet();
-        subscriptions.put(
-            "consumer1",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
-            )
+        subscriptions.put("consumer1",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
         );
-        subscriptions.put(
-            "future-consumer",
-            new PartitionAssignor.Subscription(
-                Collections.singletonList("topic1"),
-                encodeFutureSubscription()
-            )
+        subscriptions.put("future-consumer",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        encodeFutureSubscription())
         );
 
         mockTaskManager(