You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/23 22:22:24 UTC

[kafka] branch trunk updated: KAFKA-8179: Part 2, ConsumerCoordinator Algorithm (#6778)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5c4ebd  KAFKA-8179: Part 2, ConsumerCoordinator Algorithm (#6778)
e5c4ebd is described below

commit e5c4ebdd7433d746a46d6962ee04ff2d782d892b
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Jun 23 15:22:07 2019 -0700

    KAFKA-8179: Part 2, ConsumerCoordinator Algorithm (#6778)
    
    1. In ConsumerCoordinator, select the protocol as the common protocol from all configured assignor instances' supported protocols with the highest number.
    1.b. In onJoinPrepare: only call onPartitionRevoked with EAGER.
    1.a. In onJoinComplete: call onPartitionAssigned with EAGER; call onPartitionRevoked following onPartitionAssigned with COOPERATIVE, and then request re-join if the error indicates so.
    1.c. In performAssignment: update the user's assignor returned assignments by excluding all partitions that are still owned by some other members.
    
    2. I've refactored the Subscription / Assignment such that: assigned partitions, error codes, and group instance id are not-final anymore, instead they can be updated. For the last one, it is directly related to the logic of this PR but I felt it is more convienent to go with other fields.
    
    3. Testing: primarily in ConsumerCoordinatorTest, make it parameterized with protocol, and add necessary scenarios for COOPERATIVE protocol.
    
    I intentionally omitted the documentation change since there are some behavioral updates that needs to be finalized in later PRs, and hence I will also only add the docs in later PRs.
    
    Reviewers: Bill Bejeck <bb...@gmail.com>, Boyang Chen <bo...@confluent.io>, Sophie Blee-Goldman <so...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    | 181 +++++-
 .../consumer/internals/ConsumerProtocol.java       |  58 +-
 .../consumer/internals/PartitionAssignor.java      |  48 +-
 .../java/org/apache/kafka/clients/MockClient.java  |   2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   2 +-
 .../internals/ConsumerCoordinatorTest.java         | 623 ++++++++++++---------
 .../consumer/internals/ConsumerProtocolTest.java   |  47 +-
 .../consumer/internals/MockPartitionAssignor.java  |  11 +
 .../test/java/org/apache/kafka/test/TestUtils.java |   5 +
 9 files changed, 624 insertions(+), 353 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 477f24b..48d0c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -119,6 +120,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    private final RebalanceProtocol protocol;
+
     /**
      * Initialize the coordination manager.
      */
@@ -158,6 +161,32 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (autoCommitEnabled)
             this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
 
+        // select the rebalance protocol such that:
+        //   1. only consider protocols that are supported by all the assignors. If there is no common protocols supported
+        //      across all the assignors, throw an exception.
+        //   2. if there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the
+        //      id number indicates how advanced the protocol is).
+        // we know there are at least one assignor in the list, no need to double check for NPE
+        if (!assignors.isEmpty()) {
+            List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
+
+            for (PartitionAssignor assignor : assignors) {
+                supportedProtocols.retainAll(assignor.supportedProtocols());
+            }
+
+            if (supportedProtocols.isEmpty()) {
+                throw new IllegalArgumentException("Specified assignors " +
+                    assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) +
+                    " do not have commonly supported rebalance protocol");
+            }
+
+            Collections.sort(supportedProtocols);
+
+            protocol = supportedProtocols.get(supportedProtocols.size() - 1);
+        } else {
+            protocol = null;
+        }
+
         this.metadata.requestUpdate();
     }
 
@@ -236,6 +265,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
+        Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
         if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
             log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
@@ -262,14 +293,66 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
-        log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+
+        switch (protocol) {
+            case EAGER:
+                if (!ownedPartitions.isEmpty()) {
+                    log.info("Coordinator has owned partitions {} that are not revoked with {} protocol, " +
+                        "it is likely client is woken up before a previous pending rebalance completes its callback", ownedPartitions, protocol);
+                }
+
+                log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+                try {
+                    listener.onPartitionsAssigned(assignedPartitions);
+                } catch (WakeupException | InterruptException e) {
+                    throw e;
+                } catch (Exception e) {
+                    log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
+                }
+                break;
+
+            case COOPERATIVE:
+                assignAndRevoke(listener, assignedPartitions, ownedPartitions);
+
+                if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) {
+                    requestRejoin();
+                }
+
+                break;
+        }
+
+    }
+
+    private void assignAndRevoke(final ConsumerRebalanceListener listener,
+                                 final Set<TopicPartition> assignedPartitions,
+                                 final Set<TopicPartition> ownedPartitions) {
+        Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
+        Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
+                "newly added partitions: {}, revoking partitions: {}",
+            Utils.join(assignedPartitions, ", "),
+            Utils.join(ownedPartitions, ", "),
+            Utils.join(addedPartitions, ", "),
+            Utils.join(revokedPartitions, ", "));
+
         try {
-            listener.onPartitionsAssigned(assignedPartitions);
+            listener.onPartitionsAssigned(addedPartitions);
         } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
             log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
         }
+
+        try {
+            listener.onPartitionsRevoked(revokedPartitions);
+        } catch (WakeupException | InterruptException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+        }
     }
 
     void maybeUpdateSubscriptionMetadata() {
@@ -302,6 +385,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         invokeCompletedOffsetCommitCallbacks();
 
         if (subscriptions.partitionsAutoAssigned()) {
+            if (protocol == null) {
+                throw new IllegalStateException("User confingure ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty " +
+                    "while trying to subscribe for group protocol to auto assign partitions");
+            }
             // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
             // group proactively due to application inactivity even if (say) the coordinator cannot be found.
             pollHeartbeat(timer.currentTimeMs());
@@ -389,11 +476,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
+        // collect all the owned partitions
+        Map<TopicPartition, String> ownedPartitions = new HashMap<>();
         for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
-            Subscription subscription = ConsumerProtocol.buildSubscription(ByteBuffer.wrap(memberSubscription.metadata()),
-                                                                           Optional.ofNullable(memberSubscription.groupInstanceId()));
+            Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
+            subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
             subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
+            ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId())));
         }
 
         // the leader will begin watching for changes to any of the topics the group is interested in,
@@ -404,7 +494,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
 
-        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
+        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), subscriptions);
+
+        switch (protocol) {
+            case EAGER:
+                break;
+
+            case COOPERATIVE:
+                adjustAssignment(ownedPartitions, assignments);
+                break;
+        }
 
         // user-customized assignor may have created some topics that are not in the subscription list
         // and assign their partitions to the members; in this case we would like to update the leader's
@@ -414,7 +513,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
         //       we may need to modify the PartitionAssignor API to better support this case.
         Set<String> assignedTopics = new HashSet<>();
-        for (Assignment assigned : assignment.values()) {
+        for (Assignment assigned : assignments.values()) {
             for (TopicPartition tp : assigned.partitions())
                 assignedTopics.add(tp.topic());
         }
@@ -437,10 +536,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         assignmentSnapshot = metadataSnapshot;
 
-        log.debug("Finished assignment for group: {}", assignment);
+        log.debug("Finished assignment for group: {}", assignments);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
-        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
+        for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
             ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
             groupAssignment.put(assignmentEntry.getKey(), buffer);
         }
@@ -448,6 +547,40 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return groupAssignment;
     }
 
+    private void adjustAssignment(final Map<TopicPartition, String> ownedPartitions,
+                                  final Map<String, Assignment> assignments) {
+        boolean revocationsNeeded = false;
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
+        for (final Map.Entry<String, Assignment> entry : assignments.entrySet()) {
+            final Assignment assignment = entry.getValue();
+            assignedPartitions.addAll(assignment.partitions());
+
+            // update the assignment if the partition is owned by another different owner
+            List<TopicPartition> updatedPartitions = assignment.partitions().stream()
+                .filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp)))
+                .collect(Collectors.toList());
+            if (!updatedPartitions.equals(assignment.partitions())) {
+                assignment.updatePartitions(updatedPartitions);
+                revocationsNeeded = true;
+            }
+        }
+
+        // for all owned but not assigned partitions, blindly add them to assignment
+        for (final Map.Entry<TopicPartition, String> entry : ownedPartitions.entrySet()) {
+            final TopicPartition tp = entry.getKey();
+            if (!assignedPartitions.contains(tp)) {
+                assignments.get(entry.getValue()).partitions().add(tp);
+            }
+        }
+
+        // if revocations are triggered, tell everyone to re-join immediately.
+        if (revocationsNeeded) {
+            for (final Assignment assignment : assignments.values()) {
+                assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN);
+            }
+        }
+    }
+
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
@@ -455,14 +588,26 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         // execute the user's callback before rebalance
         ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
-        Set<TopicPartition> revoked = subscriptions.assignedPartitions();
-        log.info("Revoking previously assigned partitions {}", revoked);
-        try {
-            listener.onPartitionsRevoked(revoked);
-        } catch (WakeupException | InterruptException e) {
-            throw e;
-        } catch (Exception e) {
-            log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+
+        switch (protocol) {
+            case EAGER:
+                Set<TopicPartition> revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+                log.info("Revoking previously assigned partitions {}", revokedPartitions);
+                try {
+                    listener.onPartitionsRevoked(revokedPartitions);
+                } catch (WakeupException | InterruptException e) {
+                    throw e;
+                } catch (Exception e) {
+                    log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
+                }
+
+                // also clear the assigned partitions since all have been revoked
+                subscriptions.assignFromSubscribed(Collections.emptySet());
+
+                break;
+
+            case COOPERATIVE:
+                break;
         }
 
         isLeader = false;
@@ -1047,4 +1192,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    /* test-only classes below */
+    RebalanceProtocol getProtocol() {
+        return protocol;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 9e7961e..d05d5b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -27,12 +27,8 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 
 /**
  * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
@@ -56,7 +52,22 @@ import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
  *   ErrorCode          => [int16]
  * </pre>
  *
- * Older versioned formats can be inferred by reading the code below.
+ * Version 0 format:
+ *
+ * <pre>
+ * Subscription => Version Topics
+ *   Version    => Int16
+ *   Topics     => [String]
+ *   UserData   => Bytes
+ *
+ * Assignment => Version TopicPartitions
+ *   Version            => int16
+ *   AssignedPartitions => [Topic Partitions]
+ *     Topic            => String
+ *     Partitions       => [int32]
+ *   UserData           => Bytes
+ * </pre>
+ *
  *
  * The current implementation assumes that future versions will not break compatibility. When
  * it encounters a newer version, it parses it using the current format. This basically means
@@ -74,6 +85,8 @@ public class ConsumerProtocol {
     public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
     public static final String USER_DATA_KEY_NAME = "user_data";
 
+    public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code");
+
     public static final short CONSUMER_PROTOCOL_V0 = 0;
     public static final short CONSUMER_PROTOCOL_V1 = 1;
 
@@ -106,13 +119,13 @@ public class ConsumerProtocol {
         new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
         ERROR_CODE);
 
-    public enum Errors {
+    public enum AssignmentError {
         NONE(0),
         NEED_REJOIN(1);
 
         private final short code;
 
-        Errors(final int code) {
+        AssignmentError(final int code) {
             this.code = (short) code;
         }
 
@@ -120,7 +133,7 @@ public class ConsumerProtocol {
             return code;
         }
 
-        public static Errors fromCode(final short code) {
+        public static AssignmentError fromCode(final short code) {
             switch (code) {
                 case 0:
                     return NONE;
@@ -179,20 +192,17 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Subscription buildSubscriptionV0(ByteBuffer buffer, Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
         for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
             topics.add((String) topicObj);
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0,
-                                                  topics,
-                                                  userData,
-                                                  Collections.emptyList(),
-                                                  groupInstanceId);
+
+        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
     }
 
-    public static PartitionAssignor.Subscription buildSubscriptionV1(ByteBuffer buffer, Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V1.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -208,14 +218,10 @@ public class ConsumerProtocol {
             }
         }
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1,
-                                                  topics,
-                                                  userData,
-                                                  ownedPartitions,
-                                                  groupInstanceId);
+        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
     }
 
-    public static PartitionAssignor.Subscription buildSubscription(ByteBuffer buffer, Optional<String> groupInstanceId) {
+    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
@@ -224,14 +230,14 @@ public class ConsumerProtocol {
 
         switch (version) {
             case CONSUMER_PROTOCOL_V0:
-                return buildSubscriptionV0(buffer, groupInstanceId);
+                return deserializeSubscriptionV0(buffer);
 
             case CONSUMER_PROTOCOL_V1:
-                return buildSubscriptionV1(buffer, groupInstanceId);
+                return deserializeSubscriptionV1(buffer);
 
             // assume all higher versions can be parsed as V1
             default:
-                return buildSubscriptionV1(buffer, groupInstanceId);
+                return deserializeSubscriptionV1(buffer);
         }
     }
 
@@ -267,7 +273,7 @@ public class ConsumerProtocol {
             topicAssignments.add(topicAssignment);
         }
         struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        struct.set(ERROR_CODE.name, assignment.error().code);
+        struct.set(ERROR_CODE, assignment.error().code);
 
         ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct));
         CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
@@ -316,7 +322,7 @@ public class ConsumerProtocol {
             }
         }
 
-        Errors error = Errors.fromCode(struct.get(ERROR_CODE));
+        AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE));
 
         return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error);
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 921a55b..c26f684 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -80,7 +80,7 @@ public interface PartitionAssignor {
     }
 
     /**
-     * Indicate which rebalance protocol this assignor can would work with;
+     * Indicate which rebalance protocol this assignor works with;
      * By default it should always work with {@link RebalanceProtocol#EAGER}.
      */
     default List<RebalanceProtocol> supportedProtocols() {
@@ -88,7 +88,7 @@ public interface PartitionAssignor {
     }
 
     /**
-     * Return the version of the assignor which indicate how the user metadata encodings
+     * Return the version of the assignor which indicates how the user metadata encodings
      * and the assignment algorithm gets evolved.
      */
     default short version() {
@@ -131,18 +131,17 @@ public interface PartitionAssignor {
         private final List<String> topics;
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
-        private final Optional<String> groupInstanceId;
+        private Optional<String> groupInstanceId;
 
         Subscription(Short version,
                      List<String> topics,
                      ByteBuffer userData,
-                     List<TopicPartition> ownedPartitions,
-                     Optional<String> groupInstanceId) {
+                     List<TopicPartition> ownedPartitions) {
             this.version = version;
             this.topics = topics;
             this.userData = userData;
             this.ownedPartitions = ownedPartitions;
-            this.groupInstanceId = groupInstanceId;
+            this.groupInstanceId = Optional.empty();
 
             if (version < CONSUMER_PROTOCOL_V0)
                 throw new SchemaException("Unsupported subscription version: " + version);
@@ -152,14 +151,11 @@ public interface PartitionAssignor {
         }
 
         Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList(), Optional.empty());
+            this(version, topics, userData, Collections.emptyList());
         }
 
-        public Subscription(List<String> topics,
-                            ByteBuffer userData,
-                            List<TopicPartition> ownedPartitions,
-                            Optional<String> groupInstanceId) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions, groupInstanceId);
+        public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
+            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
         }
 
         public Subscription(List<String> topics, ByteBuffer userData) {
@@ -186,6 +182,10 @@ public interface PartitionAssignor {
             return userData;
         }
 
+        public void setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+        }
+
         public Optional<String> groupInstanceId() {
             return groupInstanceId;
         }
@@ -202,11 +202,11 @@ public interface PartitionAssignor {
 
     class Assignment {
         private final Short version;
-        private final List<TopicPartition> partitions;
+        private List<TopicPartition> partitions;
         private final ByteBuffer userData;
-        private final ConsumerProtocol.Errors error;
+        private ConsumerProtocol.AssignmentError error;
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.Errors error) {
+        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) {
             this.version = version;
             this.partitions = partitions;
             this.userData = userData;
@@ -215,16 +215,12 @@ public interface PartitionAssignor {
             if (version < CONSUMER_PROTOCOL_V0)
                 throw new SchemaException("Unsupported subscription version: " + version);
 
-            if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.Errors.NONE)
+            if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE)
                 throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code.");
         }
 
         Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData) {
-            this(version, partitions, userData, ConsumerProtocol.Errors.NONE);
-        }
-
-        public Assignment(List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.Errors error) {
-            this(CONSUMER_PROTOCOL_V1, partitions, userData, error);
+            this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE);
         }
 
         public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
@@ -243,10 +239,18 @@ public interface PartitionAssignor {
             return partitions;
         }
 
-        public ConsumerProtocol.Errors error() {
+        public ConsumerProtocol.AssignmentError error() {
             return error;
         }
 
+        public void updatePartitions(List<TopicPartition> partitions) {
+            this.partitions = partitions;
+        }
+
+        public void setError(ConsumerProtocol.AssignmentError error) {
+            this.error = error;
+        }
+
         public ByteBuffer userData() {
             return userData;
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index b23005e..650f670 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -221,7 +221,7 @@ public class MockClient implements KafkaClient {
                     builder.latestAllowedVersion());
             AbstractRequest abstractRequest = request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
-                throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
+                throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest + " with prepared response " + futureResp.responseBody);
 
             UnsupportedVersionException unsupportedVersionException = null;
             if (futureResp.isUnsupportedRequest)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0f9b956..c1adf19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1692,7 +1692,7 @@ public class KafkaConsumerTest {
                 assertTrue(protocolIterator.hasNext());
 
                 ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(protocolMetadata, Optional.empty());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 1dc1780..a0878fb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -97,6 +99,7 @@ import java.util.regex.Pattern;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -105,6 +108,7 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(value = Parameterized.class)
 public class ConsumerCoordinatorTest {
     private final String topic1 = "test1";
     private final String topic2 = "test2";
@@ -121,8 +125,9 @@ public class ConsumerCoordinatorTest {
     private final MockTime time = new MockTime();
     private GroupRebalanceConfig rebalanceConfig;
 
-    private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
-    private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
+    private final PartitionAssignor.RebalanceProtocol protocol;
+    private final MockPartitionAssignor partitionAssignor;
+    private final List<PartitionAssignor> assignors;
     private MockClient client;
     private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
         {
@@ -139,6 +144,21 @@ public class ConsumerCoordinatorTest {
     private MockCommitCallback mockOffsetCommitCallback;
     private ConsumerCoordinator coordinator;
 
+    public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) {
+        this.protocol = protocol;
+        this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
+        this.assignors = Collections.singletonList(partitionAssignor);
+    }
+
+    @Parameterized.Parameters(name = "rebalance protocol = {0}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) {
+            values.add(new Object[]{protocol});
+        }
+        return values;
+    }
+
     @Before
     public void setup() {
         LogContext logContext = new LogContext();
@@ -177,6 +197,25 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testSelectRebalanceProtcol() {
+        List<PartitionAssignor> assignors = new ArrayList<>();
+        assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER)));
+        assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+
+        // no commonly supported protocols
+        assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
+
+        assignors.clear();
+        assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+
+        // select higher indexed (more advanced) protocols
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+            assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
+        }
+    }
+
+    @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -416,6 +455,9 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testNormalJoinGroupLeader() {
         final String consumerId = "leader";
+        final Set<String> subscription = singleton(topic1);
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -427,7 +469,7 @@ public class ConsumerCoordinatorTest {
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+        partitionAssignor.prepare(singletonMap(consumerId, assigned));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -438,34 +480,39 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().containsKey(consumerId);
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+        }, syncGroupResponse(assigned, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(toSet(assigned), subscriptions.assignedPartitions());
+        assertEquals(subscription, subscriptions.groupSubscription());
         assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test
     public void testOutdatedCoordinatorAssignment() {
         final String consumerId = "outdated_assignment";
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<String> oldSubscription = singletonList(topic2);
+        final List<TopicPartition> oldAssignment = Arrays.asList(t2p);
+        final List<String> newSubscription = singletonList(topic1);
+        final List<TopicPartition> newAssignment = Arrays.asList(t1p);
 
-        subscriptions.subscribe(singleton(topic2), rebalanceListener);
+        subscriptions.subscribe(toSet(oldSubscription), rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // Test coordinator returning unsubscribed partitions
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+        partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
 
         // First incorrect assignment for subscription
         client.prepareResponse(
                 joinGroupLeaderResponse(
-                    1, consumerId, singletonMap(consumerId, singletonList(topic2)), Errors.NONE));
+                    1, consumerId, singletonMap(consumerId, oldSubscription), Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -474,12 +521,12 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().containsKey(consumerId);
             }
-        }, syncGroupResponse(Arrays.asList(t2p), Errors.NONE));
+        }, syncGroupResponse(oldAssignment, Errors.NONE));
 
         // Second correct assignment for subscription
         client.prepareResponse(
                 joinGroupLeaderResponse(
-                    1, consumerId, singletonMap(consumerId, singletonList(topic1)), Errors.NONE));
+                    1, consumerId, singletonMap(consumerId, newSubscription), Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -488,29 +535,42 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().containsKey(consumerId);
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+        }, syncGroupResponse(newAssignment, Errors.NONE));
 
         // Poll once so that the join group future gets created and complete
         coordinator.poll(time.timer(0));
 
         // Before the sync group response gets completed change the subscription
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        subscriptions.subscribe(toSet(newSubscription), rebalanceListener);
         coordinator.poll(time.timer(0));
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
+        final Collection<TopicPartition> revoked = getRevoked(owned, newAssignment);
+        final Collection<TopicPartition> assigned = getAdded(owned, newAssignment);
+
+        int revokeCount = 1;
+        final int addCount = 1;
+
+        // with eager protocol we will call revoke on the old assignment as well
+        if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) {
+            revokeCount += 1;
+        }
+
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
-        assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
-        assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
+        assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
+        assertEquals(revokeCount, rebalanceListener.revokedCount);
+        assertEquals(revoked, rebalanceListener.revoked);
+        assertEquals(addCount, rebalanceListener.assignedCount);
+        assertEquals(assigned, rebalanceListener.assigned);
     }
 
     @Test
     public void testPatternJoinGroupLeader() {
         final String consumerId = "leader";
+        final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
+        final List<TopicPartition> owned = Collections.emptyList();
 
         subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
 
@@ -523,7 +583,7 @@ public class ConsumerCoordinatorTest {
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p, t2p)));
+        partitionAssignor.prepare(singletonMap(consumerId, assigned));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -534,7 +594,7 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().containsKey(consumerId);
             }
-        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
+        }, syncGroupResponse(assigned, Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(metadataResponse);
 
@@ -545,14 +605,17 @@ public class ConsumerCoordinatorTest {
         assertEquals(2, subscriptions.groupSubscription().size());
         assertEquals(2, subscriptions.subscription().size());
         assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(2, rebalanceListener.assigned.size());
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test
     public void testMetadataRefreshDuringRebalance() {
         final String consumerId = "leader";
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> oldAssigned = Arrays.asList(t1p);
+
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
         client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
@@ -564,11 +627,10 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         Map<String, List<String>> initialSubscription = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+        partitionAssignor.prepare(singletonMap(consumerId, oldAssigned));
 
         // the metadata will be updated in flight with a new topic added
         final List<String> updatedSubscription = Arrays.asList(topic1, topic2);
-        final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription);
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -580,14 +642,22 @@ public class ConsumerCoordinatorTest {
                 client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
                 return true;
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+        }, syncGroupResponse(oldAssigned, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
-        List<TopicPartition> newAssignment = Arrays.asList(t1p, t2p);
-        Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
+        // rejoin will only be set in the next poll call
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+        assertEquals(toSet(oldAssigned), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(getRevoked(owned, oldAssigned), rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(getAdded(owned, oldAssigned), rebalanceListener.assigned);
+
+        List<TopicPartition> newAssigned = Arrays.asList(t1p, t2p);
 
         Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topic1, topic2));
-        partitionAssignor.prepare(singletonMap(consumerId, newAssignment));
+        partitionAssignor.prepare(singletonMap(consumerId, newAssigned));
 
         // we expect to see a second rebalance with the new-found topics
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -600,22 +670,22 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
 
                 ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.buildSubscription(metadata, Optional.empty());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
                 metadata.rewind();
-                return subscription.topics().containsAll(updatedSubscriptionSet);
+                return subscription.topics().containsAll(updatedSubscription);
             }
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
-        client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(newAssigned, Errors.NONE));
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(updatedSubscriptionSet, subscriptions.subscription());
-        assertEquals(newAssignmentSet, subscriptions.assignedPartitions());
+        assertEquals(toSet(updatedSubscription), subscriptions.subscription());
+        assertEquals(toSet(newAssigned), subscriptions.assignedPartitions());
         assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(singleton(t1p), rebalanceListener.revoked);
+        assertEquals(getRevoked(oldAssigned, newAssigned), rebalanceListener.revoked);
         assertEquals(2, rebalanceListener.assignedCount);
-        assertEquals(newAssignmentSet, rebalanceListener.assigned);
+        assertEquals(getAdded(oldAssigned, newAssigned), rebalanceListener.assigned);
     }
 
     @Test
@@ -665,6 +735,8 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testWakeupDuringJoin() {
         final String consumerId = "leader";
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -675,7 +747,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+        partitionAssignor.prepare(singletonMap(consumerId, assigned));
 
         // prepare only the first half of the join and then trigger the wakeup
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -688,22 +760,25 @@ public class ConsumerCoordinatorTest {
         }
 
         // now complete the second half
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
+        assertEquals(toSet(assigned), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test
     public void testNormalJoinGroupFollower() {
         final String consumerId = "consumer";
+        final Set<String> subscription = singleton(topic1);
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p);
 
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        subscriptions.subscribe(subscription, rebalanceListener);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -718,17 +793,17 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().isEmpty();
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
+        }, syncGroupResponse(assigned, Errors.NONE));
 
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
-        assertEquals(singleton(topic1), subscriptions.groupSubscription());
+        assertEquals(toSet(assigned), subscriptions.assignedPartitions());
+        assertEquals(subscription, subscriptions.groupSubscription());
         assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test
@@ -763,6 +838,9 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testPatternJoinGroupFollower() {
         final String consumerId = "consumer";
+        final Set<String> subscription = Utils.mkSet(topic1, topic2);
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p, t2p);
 
         subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
 
@@ -783,18 +861,19 @@ public class ConsumerCoordinatorTest {
                         sync.data.generationId() == 1 &&
                         sync.groupAssignments().isEmpty();
             }
-        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
+        }, syncGroupResponse(assigned, Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(metadataResponse);
 
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(2, subscriptions.numAssignedPartitions());
-        assertEquals(2, subscriptions.subscription().size());
+        assertEquals(assigned.size(), subscriptions.numAssignedPartitions());
+        assertEquals(subscription, subscriptions.subscription());
         assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(2, rebalanceListener.assigned.size());
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test
@@ -985,7 +1064,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
+        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p)));
 
         // the leader is responsible for picking up metadata changes and forcing a group rebalance
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -1023,7 +1102,7 @@ public class ConsumerCoordinatorTest {
 
         // prepare initial rebalance
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, topics);
-        partitionAssignor.prepare(singletonMap(consumerId, Collections.singletonList(tp1)));
+        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(tp1)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -1086,7 +1165,6 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.singletonList(partition), Errors.NONE));
 
-
         // The first call to poll should raise the exception from the rebalance listener
         try {
             coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -1172,56 +1250,58 @@ public class ConsumerCoordinatorTest {
         metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
                 false, subscriptions, new LogContext(), new ClusterResourceListeners());
         client = new MockClient(time, metadata);
-        coordinator = buildCoordinator(rebalanceConfig,
-                                       new Metrics(),
-                                       assignors,
-                                       false);
-
-        subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
-
-        Node node = new Node(0, "localhost", 9999);
-        MetadataResponse.PartitionMetadata partitionMetadata =
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
+            subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
+            Node node = new Node(0, "localhost", 9999);
+            MetadataResponse.PartitionMetadata partitionMetadata =
                 new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(),
-                        singletonList(node), singletonList(node), singletonList(node));
-        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
+                    singletonList(node), singletonList(node), singletonList(node));
+            MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
                 Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
 
-        client.updateMetadata(MetadataResponse.prepareResponse(singletonList(node), "clusterId", node.id(),
+            client.updateMetadata(MetadataResponse.prepareResponse(singletonList(node), "clusterId", node.id(),
                 singletonList(topicMetadata)));
-        coordinator.maybeUpdateSubscriptionMetadata();
+            coordinator.maybeUpdateSubscriptionMetadata();
 
-        assertEquals(includeInternalTopics, subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
+            assertEquals(includeInternalTopics, subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
+        }
     }
 
     @Test
     public void testRejoinGroup() {
         String otherTopic = "otherTopic";
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
         // join the group once
-        joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
+        joinAsFollowerAndReceiveAssignment("consumer", coordinator, assigned);
 
         assertEquals(1, rebalanceListener.revokedCount);
-        assertTrue(rebalanceListener.revoked.isEmpty());
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
 
         // and join the group again
+        rebalanceListener.revoked = null;
+        rebalanceListener.assigned = null;
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
         assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(singleton(t1p), rebalanceListener.revoked);
+        assertEquals(getRevoked(assigned, assigned), rebalanceListener.revoked);
         assertEquals(2, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(assigned, assigned), rebalanceListener.assigned);
     }
 
     @Test
     public void testDisconnectInJoin() {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        final List<TopicPartition> owned = Collections.emptyList();
+        final List<TopicPartition> assigned = Arrays.asList(t1p);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1230,14 +1310,15 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+        client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(singleton(t1p), subscriptions.assignedPartitions());
+        assertEquals(toSet(assigned), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
-        assertEquals(singleton(t1p), rebalanceListener.assigned);
+        assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
     @Test(expected = ApiException.class)
@@ -1303,170 +1384,156 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitDynamicAssignment() {
         final String consumerId = "consumer";
 
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
-        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
-        subscriptions.seek(t1p, 100);
-
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertFalse(client.hasPendingResponses());
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+            true)
+        ) {
+            subscriptions.subscribe(singleton(topic1), rebalanceListener);
+            joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
+            subscriptions.seek(t1p, 100);
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            time.sleep(autoCommitIntervalMs);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(client.hasPendingResponses());
+        }
     }
 
     @Test
     public void testAutoCommitRetryBackoff() {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
-        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
-        subscriptions.seek(t1p, 100);
-        time.sleep(autoCommitIntervalMs);
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+            true)) {
+            subscriptions.subscribe(singleton(topic1), rebalanceListener);
+            joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
-        // Send an offset commit, but let it fail with a retriable error
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertTrue(coordinator.coordinatorUnknown());
+            subscriptions.seek(t1p, 100);
+            time.sleep(autoCommitIntervalMs);
 
-        // After the disconnect, we should rediscover the coordinator
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.poll(time.timer(Long.MAX_VALUE));
+            // Send an offset commit, but let it fail with a retriable error
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertTrue(coordinator.coordinatorUnknown());
 
-        subscriptions.seek(t1p, 200);
+            // After the disconnect, we should rediscover the coordinator
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(Long.MAX_VALUE));
 
-        // Until the retry backoff has expired, we should not retry the offset commit
-        time.sleep(retryBackoffMs / 2);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(0, client.inFlightRequestCount());
+            subscriptions.seek(t1p, 200);
 
-        // Once the backoff expires, we should retry
-        time.sleep(retryBackoffMs / 2);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(1, client.inFlightRequestCount());
-        respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+            // Until the retry backoff has expired, we should not retry the offset commit
+            time.sleep(retryBackoffMs / 2);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(0, client.inFlightRequestCount());
+
+            // Once the backoff expires, we should retry
+            time.sleep(retryBackoffMs / 2);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(1, client.inFlightRequestCount());
+            respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+        }
     }
 
     @Test
     public void testAutoCommitAwaitsInterval() {
         final String consumerId = "consumer";
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
-        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+            subscriptions.subscribe(singleton(topic1), rebalanceListener);
+            joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
-        subscriptions.seek(t1p, 100);
-        time.sleep(autoCommitIntervalMs);
+            subscriptions.seek(t1p, 100);
+            time.sleep(autoCommitIntervalMs);
 
-        // Send the offset commit request, but do not respond
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(1, client.inFlightRequestCount());
+            // Send the offset commit request, but do not respond
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(1, client.inFlightRequestCount());
 
-        time.sleep(autoCommitIntervalMs / 2);
+            time.sleep(autoCommitIntervalMs / 2);
 
-        // Ensure that no additional offset commit is sent
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(1, client.inFlightRequestCount());
+            // Ensure that no additional offset commit is sent
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(1, client.inFlightRequestCount());
 
-        respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(0, client.inFlightRequestCount());
+            respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(0, client.inFlightRequestCount());
 
-        subscriptions.seek(t1p, 200);
+            subscriptions.seek(t1p, 200);
 
-        // If we poll again before the auto-commit interval, there should be no new sends
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(0, client.inFlightRequestCount());
+            // If we poll again before the auto-commit interval, there should be no new sends
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(0, client.inFlightRequestCount());
 
-        // After the remainder of the interval passes, we send a new offset commit
-        time.sleep(autoCommitIntervalMs / 2);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertEquals(1, client.inFlightRequestCount());
-        respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+            // After the remainder of the interval passes, we send a new offset commit
+            time.sleep(autoCommitIntervalMs / 2);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertEquals(1, client.inFlightRequestCount());
+            respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+        }
     }
 
     @Test
     public void testAutoCommitDynamicAssignmentRebalance() {
         final String consumerId = "consumer";
 
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-
-        subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
+            true)) {
+            subscriptions.subscribe(singleton(topic1), rebalanceListener);
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        // haven't joined, so should not cause a commit
-        time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(time.timer(0));
+            // haven't joined, so should not cause a commit
+            time.sleep(autoCommitIntervalMs);
+            consumerClient.poll(time.timer(0));
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
+            client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+            coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
-        subscriptions.seek(t1p, 100);
+            subscriptions.seek(t1p, 100);
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertFalse(client.hasPendingResponses());
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            time.sleep(autoCommitIntervalMs);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(client.hasPendingResponses());
+        }
     }
 
     @Test
     public void testAutoCommitManualAssignment() {
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-
-        subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.seek(t1p, 100);
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+            subscriptions.assignFromUser(singleton(t1p));
+            subscriptions.seek(t1p, 100);
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertFalse(client.hasPendingResponses());
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            time.sleep(autoCommitIntervalMs);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(client.hasPendingResponses());
+        }
     }
 
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-
-        subscriptions.assignFromUser(singleton(t1p));
-        subscriptions.seek(t1p, 100);
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+            subscriptions.assignFromUser(singleton(t1p));
+            subscriptions.seek(t1p, 100);
 
-        // no commit initially since coordinator is unknown
-        consumerClient.poll(time.timer(0));
-        time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(time.timer(0));
+            // no commit initially since coordinator is unknown
+            consumerClient.poll(time.timer(0));
+            time.sleep(autoCommitIntervalMs);
+            consumerClient.poll(time.timer(0));
 
-        // now find the coordinator
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+            // now find the coordinator
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        // sleep only for the retry backoff
-        time.sleep(retryBackoffMs);
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-        coordinator.poll(time.timer(Long.MAX_VALUE));
-        assertFalse(client.hasPendingResponses());
+            // sleep only for the retry backoff
+            time.sleep(retryBackoffMs);
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            coordinator.poll(time.timer(Long.MAX_VALUE));
+            assertFalse(client.hasPendingResponses());
+        }
     }
 
     @Test
@@ -2001,115 +2068,125 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCloseDynamicAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty());
-        gracefulCloseTest(coordinator, true);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty())) {
+            gracefulCloseTest(coordinator, true);
+        }
     }
 
     @Test
     public void testCloseManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
-        gracefulCloseTest(coordinator, false);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty())) {
+            gracefulCloseTest(coordinator, false);
+        }
     }
 
     @Test
     public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty());
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, Optional.empty())) {
+            makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        }
     }
 
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
-        closeVerifyTimeout(coordinator, 1000, 0, 0);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+            makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+            closeVerifyTimeout(coordinator, 1000, 0, 0);
+        }
     }
 
     @Test
     public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty())) {
+            makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        }
     }
 
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
-        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
-        closeVerifyTimeout(coordinator, 1000, 0, 0);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+            makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+            closeVerifyTimeout(coordinator, 1000, 0, 0);
+        }
     }
 
     @Test
     public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+            makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, 1000, 1000, 1000);
+        }
     }
 
     @Test
     public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+            makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        }
     }
 
     @Test
     public void testCloseNoResponseForCommit() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        }
     }
 
     @Test
     public void testCloseNoResponseForLeaveGroup() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty());
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.empty())) {
+            closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+        }
     }
 
     @Test
     public void testCloseNoWait() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        time.sleep(autoCommitIntervalMs);
-        closeVerifyTimeout(coordinator, 0, 0, 0);
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+            time.sleep(autoCommitIntervalMs);
+            closeVerifyTimeout(coordinator, 0, 0, 0);
+        }
     }
 
     @Test
     public void testHeartbeatThreadClose() throws Exception {
-        ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId);
-        coordinator.ensureActiveGroup();
-        time.sleep(heartbeatIntervalMs + 100);
-        Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
-        closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
-        Thread[] threads = new Thread[Thread.activeCount()];
-        int threadCount = Thread.enumerate(threads);
-        for (int i = 0; i < threadCount; i++)
-            assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
+        try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, groupInstanceId)) {
+            coordinator.ensureActiveGroup();
+            time.sleep(heartbeatIntervalMs + 100);
+            Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
+            closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs);
+            Thread[] threads = new Thread[Thread.activeCount()];
+            int threadCount = Thread.enumerate(threads);
+            for (int i = 0; i < threadCount; i++) {
+                assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
+            }
+        }
     }
 
     @Test
     public void testAutoCommitAfterCoordinatorBackToService() {
-        ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
-                                                           new Metrics(),
-                                                           assignors,
-                                                           true);
-
-        subscriptions.assignFromUser(Collections.singleton(t1p));
-        subscriptions.seek(t1p, 100L);
-
-        coordinator.markCoordinatorUnknown();
-        assertTrue(coordinator.coordinatorUnknown());
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
-
-        // async commit offset should find coordinator
-        time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
-        coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
-        assertFalse(coordinator.coordinatorUnknown());
-        assertEquals(100L, subscriptions.position(t1p).offset);
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            subscriptions.seek(t1p, 100L);
+
+            coordinator.markCoordinatorUnknown();
+            assertTrue(coordinator.coordinatorUnknown());
+            client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+
+            // async commit offset should find coordinator
+            time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
+            coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+            assertFalse(coordinator.coordinatorUnknown());
+            assertEquals(100L, subscriptions.position(t1p).offset);
+        }
     }
 
     @Test(expected = FencedInstanceIdException.class)
@@ -2266,6 +2343,34 @@ public class ConsumerCoordinatorTest {
                 null);
     }
 
+    private Collection<TopicPartition> getRevoked(final List<TopicPartition> owned,
+                                                  final List<TopicPartition> assigned) {
+        switch (protocol) {
+            case EAGER:
+                return toSet(owned);
+            case COOPERATIVE:
+                final List<TopicPartition> revoked = new ArrayList<>(owned);
+                revoked.removeAll(assigned);
+                return toSet(revoked);
+            default:
+                throw new IllegalStateException("This should not happen");
+        }
+    }
+
+    private Collection<TopicPartition> getAdded(final List<TopicPartition> owned,
+                                                final List<TopicPartition> assigned) {
+        switch (protocol) {
+            case EAGER:
+                return toSet(assigned);
+            case COOPERATIVE:
+                final List<TopicPartition> added = new ArrayList<>(assigned);
+                added.removeAll(owned);
+                return toSet(added);
+            default:
+                throw new IllegalStateException("This should not happen");
+        }
+    }
+
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
         return FindCoordinatorResponse.prepareResponse(error, node);
     }
@@ -2294,7 +2399,7 @@ public class ConsumerCoordinatorTest {
                         .setProtocolName(partitionAssignor.name())
                         .setLeader(memberId)
                         .setMemberId(memberId)
-                        .setMembers(Collections.emptyList())
+                        .setMembers(metadata)
         );
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index d89c5a7..9e601b0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerProtocol.Errors;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
@@ -29,12 +28,9 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA;
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.OWNED_PARTITIONS_KEY_NAME;
@@ -44,6 +40,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
+import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -59,7 +56,7 @@ public class ConsumerProtocolTest {
     public void serializeDeserializeMetadata() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertEquals(0, parsedSubscription.userData().limit());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
@@ -70,8 +67,10 @@ public class ConsumerProtocolTest {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
 
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+        parsedSubscription.setGroupInstanceId(groupInstanceId);
         assertEquals(subscription.topics(), parsedSubscription.topics());
+        assertEquals(0, parsedSubscription.userData().limit());
         assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
 
@@ -79,34 +78,29 @@ public class ConsumerProtocolTest {
     public void serializeDeserializeNullSubscriptionUserData() {
         Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
-        assertFalse(parsedSubscription.groupInstanceId().isPresent());
     }
 
     @Test
     public void deserializeOldSubscriptionVersion() {
         Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
-        assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
     }
 
     @Test
     public void deserializeNewSubscriptionWithOldVersion() {
-        Subscription subscription = new Subscription((short) 1,
-                                                     Arrays.asList("foo", "bar"),
-                                                     null, Collections.singletonList(tp2),
-                                                     Optional.empty());
+        Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         header.getShort(VERSION_KEY_NAME);
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscriptionV0(buffer, Optional.empty());
+        Subscription parsedSubscription = ConsumerProtocol.deserializeSubscriptionV0(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
@@ -141,10 +135,11 @@ public class ConsumerProtocolTest {
 
         buffer.flip();
 
-        Subscription parsedSubscription = ConsumerProtocol.buildSubscription(buffer, groupInstanceId);
-        assertEquals(Collections.singletonList("topic"), parsedSubscription.topics());
-        assertEquals(Collections.singletonList(tp2), parsedSubscription.ownedPartitions());
-        assertEquals(groupInstanceId, parsedSubscription.groupInstanceId());
+        Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
+        subscription.setGroupInstanceId(groupInstanceId);
+        assertEquals(Collections.singletonList("topic"), subscription.topics());
+        assertEquals(Collections.singletonList(tp2), subscription.ownedPartitions());
+        assertEquals(groupInstanceId, subscription.groupInstanceId());
     }
 
     @Test
@@ -172,20 +167,20 @@ public class ConsumerProtocolTest {
         Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
         assertNull(parsedAssignment.userData());
-        assertEquals(Errors.NONE, parsedAssignment.error());
+        assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
     }
 
     @Test
     public void deserializeNewAssignmentWithOldVersion() {
         List<TopicPartition> partitions = Collections.singletonList(tp1);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, Errors.NEED_REJOIN));
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN));
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as 0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         header.getShort(VERSION_KEY_NAME);
         Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer);
         assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
         assertNull(parsedAssignment.userData());
-        assertEquals(Errors.NONE, parsedAssignment.error());
+        assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
     }
 
     @Test
@@ -205,7 +200,7 @@ public class ConsumerProtocolTest {
                         .set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
                         .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})});
         assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
-        assignmentV100.set(ERROR_CODE.name, Errors.NEED_REJOIN.code());
+        assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code());
         assignmentV100.set("foo", "bar");
 
         Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
@@ -219,10 +214,6 @@ public class ConsumerProtocolTest {
 
         PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions()));
-        assertEquals(Errors.NEED_REJOIN, assignment.error());
-    }
-
-    private static <T> Set<T> toSet(Collection<T> collection) {
-        return new HashSet<>(collection);
+        assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error());
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
index 609c773..ca7cb34 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
@@ -23,8 +23,14 @@ import java.util.Map;
 
 public class MockPartitionAssignor extends AbstractPartitionAssignor {
 
+    private final List<RebalanceProtocol> supportedProtocols;
+
     private Map<String, List<TopicPartition>> result = null;
 
+    MockPartitionAssignor(final List<RebalanceProtocol> supportedProtocols) {
+        this.supportedProtocols = supportedProtocols;
+    }
+
     @Override
     public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
@@ -38,6 +44,11 @@ public class MockPartitionAssignor extends AbstractPartitionAssignor {
         return "consumer-mock-assignor";
     }
 
+    @Override
+    public List<RebalanceProtocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
     public void clear() {
         this.result = null;
     }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 250ebc0..4d35f06 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -41,6 +41,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -434,6 +435,10 @@ public class TestUtils {
         return list;
     }
 
+    public static <T> Set<T> toSet(Collection<T> collection) {
+        return new HashSet<>(collection);
+    }
+
     public static ByteBuffer toBuffer(Struct struct) {
         ByteBuffer buffer = ByteBuffer.allocate(struct.sizeOf());
         struct.writeTo(buffer);