You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/31 06:29:58 UTC

[kafka] branch trunk updated: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup (#13639)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49d9c6775de KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup (#13639)
49d9c6775de is described below

commit 49d9c6775dedd1abcf45683d6c9e3d9c6c05db9c
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed May 31 08:29:41 2023 +0200

    KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup (#13639)
    
    This patch adds the GroupMetadataManager to the group-coordinator module. This manager is responsible for handling the groups management, the members management and the entire reconciliation process. At this point, only the new consumer group type/protocol is implemented.
    
    The new manager is based on an architecture inspired from the quorum controller. A request can access/read the state but can't mutate it directly. Instead, a list of records is generated together with the response and those records are applied to the state by the runtime framework. We use timeline data structures. Note that the runtime framework is not part of this patch. It will come in a following one.
    
    Reviewers: Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 checkstyle/import-control.xml                      |    3 +
 checkstyle/suppressions.xml                        |    6 +-
 .../coordinator/group/GroupMetadataManager.java    |  876 +++++++++
 .../kafka/coordinator/group/RecordHelpers.java     |    2 +-
 .../coordinator/group/consumer/ConsumerGroup.java  |  622 ++++++
 .../group/consumer/ConsumerGroupMember.java        |   35 +-
 .../group/consumer/CurrentAssignmentBuilder.java   |   10 +-
 .../group/GroupMetadataManagerTest.java            | 2073 ++++++++++++++++++++
 .../kafka/coordinator/group/RecordHelpersTest.java |    2 +-
 .../group/consumer/ConsumerGroupMemberTest.java    |   12 +-
 .../group/consumer/ConsumerGroupTest.java          |  544 +++++
 .../consumer/CurrentAssignmentBuilderTest.java     |   44 +-
 12 files changed, 4175 insertions(+), 54 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index fd74add9faa..a3d40ff309e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -224,12 +224,15 @@
       <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.common.annotation" />
       <allow pkg="org.apache.kafka.common.message" />
+      <allow pkg="org.apache.kafka.common.metadata" />
+      <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.coordinator.group" />
       <allow pkg="org.apache.kafka.image"/>
       <allow pkg="org.apache.kafka.server.common"/>
       <allow pkg="org.apache.kafka.server.util"/>
+      <allow pkg="org.apache.kafka.timeline"/>
     </subpackage>
   </subpackage>
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c2fa6d4a47d..6dd2fa0c492 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -321,7 +321,11 @@
 
     <!-- group coordinator -->
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerGroupMember).java"/>
+              files="(ConsumerGroupMember|GroupMetadataManager).java"/>
+    <suppress checks="MethodLength"
+              files="(ConsumerGroupTest|GroupMetadataManagerTest).java"/>
+    <suppress checks="NPathComplexity"
+              files="(GroupMetadataManager).java"/>
     <suppress checks="ParameterNumber"
               files="(ConsumerGroupMember).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
new file mode 100644
index 00000000000..c05f7e3d9a5
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The supported partition assignors keyed by their name.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The maximum number of members allowed in a single consumer group.
+     */
+    private final int consumerGroupMaxSize;
+
+    /**
+     * The heartbeat interval for consumer groups.
+     */
+    private final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The topics metadata (or image).
+     */
+    private TopicsImage topicsImage;
+
+    private GroupMetadataManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        List<PartitionAssignor> assignors,
+        TopicsImage topicsImage,
+        int consumerGroupMaxSize,
+        int consumerGroupHeartbeatIntervalMs
+    ) {
+        this.log = logContext.logger(GroupMetadataManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.topicsImage = topicsImage;
+        this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
+        this.defaultAssignor = assignors.get(0);
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * Gets or maybe creates a consumer group.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     *
+     * Package private for testing.
+     */
+    ConsumerGroup getOrMaybeCreateConsumerGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
+        }
+
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
+            groups.put(groupId, consumerGroup);
+            return consumerGroup;
+        } else {
+            if (group.type() == Group.GroupType.CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Removes the group.
+     *
+     * @param groupId The group id.
+     */
+    private void removeGroup(
+        String groupId
+    ) {
+        groups.remove(groupId);
+    }
+
+    /**
+     * Throws an InvalidRequestException if the value is non-null and empty.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    private void throwIfEmptyString(
+        String value,
+        String error
+    ) throws InvalidRequestException {
+        if (value != null && value.isEmpty()) {
+            throw new InvalidRequestException(error);
+        }
+    }
+
+    /**
+     * Throws an InvalidRequestException if the value is non-null.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    private void throwIfNotNull(
+        Object value,
+        String error
+    ) throws InvalidRequestException {
+        if (value != null) {
+            throw new InvalidRequestException(error);
+        }
+    }
+
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
+        ConsumerGroupHeartbeatRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+        throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
+        throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+        throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex is not supported yet.");
+        throwIfNotNull(request.clientAssignors(), "Client side assignors are not supported yet.");
+
+        if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+        } else if (request.memberEpoch() == 0) {
+            if (request.rebalanceTimeoutMs() == -1) {
+                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
+            }
+            if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
+                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
+            }
+            if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
+                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
+            }
+        } else {
+            throw new InvalidRequestException("MemberEpoch is invalid.");
+        }
+
+        if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
+            throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+                + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+                + ".");
+        }
+    }
+
+    /**
+     * Verifies that the partitions currently owned by the member (the ones set in the
+     * request) matches the ones that the member should own. It matches if the consumer
+     * only owns partitions which are in the assigned partitions. It does not match if
+     * it owns any other partitions.
+     *
+     * @param ownedTopicPartitions  The partitions provided by the consumer in the request.
+     * @param target                The partitions that the member should have.
+     *
+     * @return A boolean indicating whether the owned partitions are a subset or not.
+     */
+    private boolean isSubset(
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
+        Map<Uuid, Set<Integer>> target
+    ) {
+        if (ownedTopicPartitions == null) return false;
+
+        for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
+            Set<Integer> partitions = target.get(topicPartitions.topicId());
+            if (partitions == null) return false;
+            for (Integer partitionId : topicPartitions.partitions()) {
+                if (!partitions.contains(partitionId)) return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Checks whether the consumer group can accept a new member or not based on the
+     * max group size defined.
+     *
+     * @param group     The consumer group.
+     * @param memberId  The member id.
+     *
+     * @throws GroupMaxSizeReachedException if the maximum capacity has been reached.
+     */
+    private void throwIfConsumerGroupIsFull(
+        ConsumerGroup group,
+        String memberId
+    ) throws GroupMaxSizeReachedException {
+        // If the consumer group has reached its maximum capacity, the member is rejected if it is not
+        // already a member of the consumer group.
+        if (group.numMembers() >= consumerGroupMaxSize && (memberId.isEmpty() || !group.hasMember(memberId))) {
+            throw new GroupMaxSizeReachedException("The consumer group has reached its maximum capacity of "
+                + consumerGroupMaxSize + " members.");
+        }
+    }
+
+    /**
+     * Validates the member epoch provided in the heartbeat request.
+     *
+     * @param member                The consumer group member.
+     * @param receivedMemberEpoch   The member epoch.
+     * @param ownedTopicPartitions  The owned partitions.
+     *
+     * @throws NotCoordinatorException if the provided epoch is ahead of the epoch known
+     *                                 by this coordinator. This suggests that the member
+     *                                 got a higher epoch from another coordinator.
+     * @throws FencedMemberEpochException if the provided epoch is behind the epoch known
+     *                                    by this coordinator.
+     */
+    private void throwIfMemberEpochIsInvalid(
+        ConsumerGroupMember member,
+        int receivedMemberEpoch,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+    ) {
+        if (receivedMemberEpoch > member.memberEpoch()) {
+            throw new FencedMemberEpochException("The consumer group member has a greater member "
+                + "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator ("
+                + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
+        } else if (receivedMemberEpoch < member.memberEpoch()) {
+            // If the member comes with the previous epoch and has a subset of the current assignment partitions,
+            // we accept it because the response with the bumped epoch may have been lost.
+            if (receivedMemberEpoch != member.previousMemberEpoch() || !isSubset(ownedTopicPartitions, member.assignedPartitions())) {
+                throw new FencedMemberEpochException("The consumer group member has a smaller member "
+                    + "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator ("
+                    + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
+            }
+        }
+    }
+
+    private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment(
+        ConsumerGroupMember member
+    ) {
+        ConsumerGroupHeartbeatResponseData.Assignment assignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+            .setAssignedTopicPartitions(fromAssignmentMap(member.assignedPartitions()));
+
+        if (member.state() == ConsumerGroupMember.MemberState.ASSIGNING) {
+            assignment.setPendingTopicPartitions(fromAssignmentMap(member.partitionsPendingAssignment()));
+        }
+
+        return assignment;
+    }
+
+    private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromAssignmentMap(
+        Map<Uuid, Set<Integer>> assignment
+    ) {
+        return assignment.entrySet().stream()
+            .map(keyValue -> new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                .setTopicId(keyValue.getKey())
+                .setPartitions(new ArrayList<>(keyValue.getValue())))
+            .collect(Collectors.toList());
+    }
+
+    private OptionalInt ofSentinel(int value) {
+        return value != -1 ? OptionalInt.of(value) : OptionalInt.empty();
+    }
+
+    /**
+     * Handles a regular heartbeat from a consumer group member. It mainly consists of
+     * three parts:
+     * 1) The member is created or updated. The group epoch is bumped if the member
+     *    has been created or updated.
+     * 2) The target assignment for the consumer group is updated if the group epoch
+     *    is larger than the current target assignment epoch.
+     * 3) The member's assignment is reconciled with the target assignment.
+     *
+     * @param groupId               The group id from the request.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param instanceId            The instance id from the request or null.
+     * @param rackId                The rack id from the request or null.
+     * @param rebalanceTimeoutMs    The rebalance timeout from the request or -1.
+     * @param clientId              The client id.
+     * @param clientHost            The client host.
+     * @param subscribedTopicNames  The list of subscribed topic names from the request
+     *                              of null.
+     * @param subscribedTopicRegex  The regular expression based subscription from the
+     *                              request or null.
+     * @param assignorName          The assignor name from the request or null.
+     * @param ownedTopicPartitions  The list of owned partitions from the request or null.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    private Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames,
+        String subscribedTopicRegex,
+        String assignorName,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+    ) throws ApiException {
+        List<Record> records = new ArrayList<>();
+
+        // Get or create the consumer group.
+        boolean createIfNotExists = memberEpoch == 0;
+        final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists);
+        throwIfConsumerGroupIsFull(group, memberId);
+
+        // Get or create the member.
+        if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+        final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
+
+        if (memberEpoch == 0) {
+            log.info("[GroupId " + groupId + "] Member " + memberId + " joins the consumer group.");
+        }
+
+        // 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue
+        // record is written to the __consumer_offsets partition to persist the change. If the subscriptions have
+        // changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue
+        // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have
+        // changed, and persisted by writing a ConsumerGroupMetadataValue record to the partition.
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
+        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+            .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
+            .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
+                    updatedMember.subscribedTopicNames());
+
+                subscriptionMetadata = group.computeSubscriptionMetadata(
+                    member,
+                    updatedMember,
+                    topicsImage
+                );
+
+                if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                    log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                        + subscriptionMetadata + ".");
+                    records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+                }
+
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+
+                log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+            }
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
+        // delta between the existing and the new target assignment is persisted to the partition.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        Assignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch) {
+            String preferredServerAssignor = group.computePreferredServerAssignor(
+                member,
+                updatedMember
+            ).orElse(defaultAssignor.name());
+
+            try {
+                TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignment(group.targetAssignment())
+                        .addOrUpdateMember(memberId, updatedMember)
+                        .build();
+
+                log.info("[GroupId " + groupId + "] Computed a new target assignment for epoch " + groupEpoch + ": "
+                    + assignmentResult.targetAssignment() + ".");
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = assignmentResult.targetAssignment().get(memberId);
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = "Failed to compute a new target assignment for epoch " + groupEpoch + ": " + ex + ".";
+                log.error("[GroupId " + groupId + "] " + msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // 3. Reconcile the member's assignment with the target assignment. This is only required if
+        // the member is not stable or if a new target assignment has been installed.
+        boolean assignmentUpdated = false;
+        if (updatedMember.state() != ConsumerGroupMember.MemberState.STABLE || updatedMember.targetMemberEpoch() != targetAssignmentEpoch) {
+            ConsumerGroupMember prevMember = updatedMember;
+            updatedMember = new CurrentAssignmentBuilder(updatedMember)
+                .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+                .withCurrentPartitionEpoch(group::currentPartitionEpoch)
+                .withOwnedTopicPartitions(ownedTopicPartitions)
+                .build();
+
+            // Checking the reference is enough here because a new instance
+            // is created only when the state has changed.
+            if (updatedMember != prevMember) {
+                assignmentUpdated = true;
+                records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+                log.info("[GroupId " + groupId + "] Member " + memberId + " transitioned from " +
+                    member.currentAssignmentSummary() + " to " + updatedMember.currentAssignmentSummary() + ".");
+
+                // TODO(dajac) Starts or restarts the timer for the revocation timeout.
+            }
+        }
+
+        // TODO(dajac) Starts or restarts the timer for the session timeout.
+
+        // Prepare the response.
+        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
+            .setMemberId(updatedMember.memberId())
+            .setMemberEpoch(updatedMember.memberEpoch())
+            .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
+
+        // The assignment is only provided in the following cases:
+        // 1. The member reported its owned partitions;
+        // 2. The member just joined or rejoined to group (epoch equals to zero);
+        // 3. The member's assignment has been updated.
+        if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) {
+            response.setAssignment(createResponseAssignment(updatedMember));
+        }
+
+        return new Result<>(records, response);
+    }
+
+    /**
+     * Handles leave request from a consumer group member.
+     * @param groupId       The group id from the request.
+     * @param memberId      The member id from the request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    private Result<ConsumerGroupHeartbeatResponseData> consumerGroupLeave(
+        String groupId,
+        String memberId
+    ) throws ApiException {
+        List<Record> records = new ArrayList<>();
+
+        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
+        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
+
+        log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group.");
+
+        // Write tombstones for the member. The order matters here.
+        records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newTargetAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId));
+
+        // We update the subscription metadata without the leaving member.
+        Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
+            member,
+            null,
+            topicsImage
+        );
+
+        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+            log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                + subscriptionMetadata + ".");
+            records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+        }
+
+        // We bump the group epoch.
+        int groupEpoch = group.groupEpoch() + 1;
+        records.add(newGroupEpochRecord(groupId, groupEpoch));
+
+        return new Result<>(records, new ConsumerGroupHeartbeatResponseData()
+            .setMemberId(memberId)
+            .setMemberEpoch(-1)
+        );
+    }
+
+    /**
+     * Handles a ConsumerGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ConsumerGroupHeartbeat request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
+        RequestContext context,
+        ConsumerGroupHeartbeatRequestData request
+    ) throws ApiException {
+        throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
+
+        if (request.memberEpoch() == -1) {
+            // -1 means that the member wants to leave the group.
+            return consumerGroupLeave(
+                request.groupId(),
+                request.memberId()
+            );
+        } else {
+            // Otherwise, it is a regular heartbeat.
+            return consumerGroupHeartbeat(
+                request.groupId(),
+                request.memberId(),
+                request.memberEpoch(),
+                request.instanceId(),
+                request.rackId(),
+                request.rebalanceTimeoutMs(),
+                context.clientId(),
+                context.clientAddress.toString(),
+                request.subscribedTopicNames(),
+                request.subscribedTopicRegex(),
+                request.serverAssignor(),
+                request.topicPartitions()
+            );
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
+     * the consumer group. It updates the subscription part of the member or
+     * delete the member.
+     *
+     * @param key   A ConsumerGroupMemberMetadataKey key.
+     * @param value A ConsumerGroupMemberMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupMemberMetadataKey key,
+        ConsumerGroupMemberMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+
+        if (value != null) {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
+            ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
+            consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build());
+        } else {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
+            if (oldMember.memberEpoch() != -1) {
+                throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+                    + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
+            }
+            if (consumerGroup.targetAssignment().containsKey(memberId)) {
+                throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+                    + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            consumerGroup.removeMember(memberId);
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupMetadataKey/Value to update the hard state of
+     * the consumer group. It updates the group epoch of the consumer
+     * group or deletes the consumer group.
+     *
+     * @param key   A ConsumerGroupMetadataKey key.
+     * @param value A ConsumerGroupMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupMetadataKey key,
+        ConsumerGroupMetadataValue value
+    ) {
+        String groupId = key.groupId();
+
+        if (value != null) {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
+            consumerGroup.setGroupEpoch(value.epoch());
+        } else {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+            if (!consumerGroup.members().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but the group still has " + consumerGroup.members().size() + " members.");
+            }
+            if (!consumerGroup.targetAssignment().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but the target assignment still has " + consumerGroup.targetAssignment().size()
+                    + " members.");
+            }
+            if (consumerGroup.assignmentEpoch() != -1) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            removeGroup(groupId);
+        }
+
+    }
+
+    /**
+     * Replays ConsumerGroupPartitionMetadataKey/Value to update the hard state of
+     * the consumer group. It updates the subscription metadata of the consumer
+     * group.
+     *
+     * @param key   A ConsumerGroupPartitionMetadataKey key.
+     * @param value A ConsumerGroupPartitionMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupPartitionMetadataKey key,
+        ConsumerGroupPartitionMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+
+        if (value != null) {
+            Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
+            value.topics().forEach(topicMetadata -> {
+                subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
+            });
+            consumerGroup.setSubscriptionMetadata(subscriptionMetadata);
+        } else {
+            consumerGroup.setSubscriptionMetadata(Collections.emptyMap());
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupTargetAssignmentMemberKey/Value to update the hard state of
+     * the consumer group. It updates the target assignment of the member or deletes it.
+     *
+     * @param key   A ConsumerGroupTargetAssignmentMemberKey key.
+     * @param value A ConsumerGroupTargetAssignmentMemberValue record.
+     */
+    public void replay(
+        ConsumerGroupTargetAssignmentMemberKey key,
+        ConsumerGroupTargetAssignmentMemberValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+
+        if (value != null) {
+            consumerGroup.updateTargetAssignment(memberId, Assignment.fromRecord(value));
+        } else {
+            consumerGroup.removeTargetAssignment(memberId);
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupTargetAssignmentMetadataKey/Value to update the hard state of
+     * the consumer group. It updates the target assignment epoch or set it to -1 to signal
+     * that it has been deleted.
+     *
+     * @param key   A ConsumerGroupTargetAssignmentMetadataKey key.
+     * @param value A ConsumerGroupTargetAssignmentMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupTargetAssignmentMetadataKey key,
+        ConsumerGroupTargetAssignmentMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+
+        if (value != null) {
+            consumerGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
+        } else {
+            if (!consumerGroup.targetAssignment().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId
+                    + " but the assignment still has " + consumerGroup.targetAssignment().size() + " members.");
+            }
+            consumerGroup.setTargetAssignmentEpoch(-1);
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupCurrentMemberAssignmentKey/Value to update the hard state of
+     * the consumer group. It updates the assignment of a member or deletes it.
+     *
+     * @param key   A ConsumerGroupCurrentMemberAssignmentKey key.
+     * @param value A ConsumerGroupCurrentMemberAssignmentValue record.
+     */
+    public void replay(
+        ConsumerGroupCurrentMemberAssignmentKey key,
+        ConsumerGroupCurrentMemberAssignmentValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+        ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
+
+        if (value != null) {
+            ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build();
+            consumerGroup.updateMember(newMember);
+        } else {
+            ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember)
+                .setMemberEpoch(-1)
+                .setPreviousMemberEpoch(-1)
+                .setTargetMemberEpoch(-1)
+                .setAssignedPartitions(Collections.emptyMap())
+                .setPartitionsPendingRevocation(Collections.emptyMap())
+                .setPartitionsPendingAssignment(Collections.emptyMap())
+                .build();
+            consumerGroup.updateMember(newMember);
+        }
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index bf6cd62a90f..fc041e3351f 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -330,7 +330,7 @@ public class RecordHelpers {
                 new ConsumerGroupCurrentMemberAssignmentValue()
                     .setMemberEpoch(member.memberEpoch())
                     .setPreviousMemberEpoch(member.previousMemberEpoch())
-                    .setTargetMemberEpoch(member.nextMemberEpoch())
+                    .setTargetMemberEpoch(member.targetMemberEpoch())
                     .setAssignedPartitions(toTopicPartitions(member.assignedPartitions()))
                     .setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation()))
                     .setPartitionsPendingAssignment(toTopicPartitions(member.partitionsPendingAssignment())),
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
new file mode 100644
index 00000000000..9df00733b30
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+    public enum ConsumerGroupState {
+        EMPTY("empty"),
+        ASSIGNING("assigning"),
+        RECONCILING("reconciling"),
+        STABLE("stable"),
+        DEAD("dead");
+
+        private final String name;
+
+        ConsumerGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<ConsumerGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the subscriptions
+     * are updated and it will trigger the computation of a new assignment
+     * for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, ConsumerGroupMember> members;
+
+    /**
+     * The number of members supporting each server assignor name.
+     */
+    private final TimelineHashMap<String, Integer> serverAssignors;
+
+    /**
+     * The number of subscribers per topic.
+     */
+    private final TimelineHashMap<String, Integer> subscribedTopicNames;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group epoch
+     * means that a new assignment is required. The assignment epoch is updated when
+     * a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, Assignment> targetAssignment;
+
+    /**
+     * The current partition epoch maps each topic-partitions to their current epoch where
+     * the epoch is the epoch of their owners. When a member revokes a partition, it removes
+     * its epochs from this map. When a member gets a partition, it adds its epochs to this map.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
+
+    public ConsumerGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId
+    ) {
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * @return The group type (Consumer).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.CONSUMER;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public ConsumerGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Gets or creates a member.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member = members.get(memberId);
+        if (member == null) {
+            if (!createIfNotExists) {
+                throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
+                    memberId, groupId));
+            }
+            member = new ConsumerGroupMember.Builder(memberId).build();
+            members.put(memberId, member);
+        }
+
+        return member;
+    }
+
+    /**
+     * Updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(ConsumerGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        ConsumerGroupMember oldMember = members.put(newMember.memberId(), newMember);
+        maybeUpdateSubscribedTopicNames(oldMember, newMember);
+        maybeUpdateServerAssignors(oldMember, newMember);
+        maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        ConsumerGroupMember member = members.remove(memberId);
+        maybeRemovePartitionEpoch(member);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     *
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, ConsumerGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The ConsumerGroupMemberAssignment or an EMPTY one if it does not
+     *         exist.
+     */
+    public Assignment targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+    }
+
+    /**
+     * Updates target assignment of a member.
+     *
+     * @param memberId              The member id.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) {
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        targetAssignment.remove(memberId);
+    }
+
+    /**
+     * @return An immutable Map containing all the target assignment keyed by member id.
+     */
+    public Map<String, Assignment> targetAssignment() {
+        return Collections.unmodifiableMap(targetAssignment);
+    }
+
+    /**
+     * Returns the current epoch of a partition or -1 if the partition
+     * does not have one.
+     *
+     * @param topicId       The topic id.
+     * @param partitionId   The partition id.
+     *
+     * @return The epoch or -1.
+     */
+    public int currentPartitionEpoch(
+        Uuid topicId, int partitionId
+    ) {
+        Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
+        if (partitions == null) {
+            return -1;
+        } else {
+            return partitions.getOrDefault(partitionId, -1);
+        }
+    }
+
+    /**
+     * Compute the preferred (server side) assignor for the group while
+     * taking into account the updated member. The computation relies
+     * on {{@link ConsumerGroup#serverAssignors}} persisted structure
+     * but it does not update it.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     *
+     * @return An Optional containing the preferred assignor.
+     */
+    public Optional<String> computePreferredServerAssignor(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        // Copy the current count and update it.
+        Map<String, Integer> counts = new HashMap<>(this.serverAssignors);
+        maybeUpdateServerAssignors(counts, oldMember, newMember);
+
+        return counts.entrySet().stream()
+            .max(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey);
+    }
+
+    /**
+     * @return The preferred assignor for the group.
+     */
+    public Optional<String> preferredServerAssignor() {
+        return serverAssignors.entrySet().stream()
+            .max(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey);
+    }
+
+    /**
+     * @return An immutable Map containing the subscription metadata for all the topics whose
+     *         members are subscribed to.
+     */
+    public Map<String, TopicMetadata> subscriptionMetadata() {
+        return Collections.unmodifiableMap(subscribedTopicMetadata);
+    }
+
+    /**
+     * Updates the subscription metadata. This replaces the previous one.
+     *
+     * @param subscriptionMetadata The new subscription metadata.
+     */
+    public void setSubscriptionMetadata(
+        Map<String, TopicMetadata> subscriptionMetadata
+    ) {
+        this.subscribedTopicMetadata.clear();
+        this.subscribedTopicMetadata.putAll(subscriptionMetadata);
+    }
+
+    /**
+     * Computes the subscription metadata based on the current subscription and
+     * an updated member.
+     *
+     * @param oldMember     The old member.
+     * @param newMember     The new member.
+     * @param topicsImage   The topic metadata.
+     *
+     * @return The new subscription metadata as an immutable Map.
+     */
+    public Map<String, TopicMetadata> computeSubscriptionMetadata(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember,
+        TopicsImage topicsImage
+    ) {
+        // Copy and update the current subscriptions.
+        Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
+        maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember);
+
+        // Create the topic metadata for each subscribed topic.
+        Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size());
+        subscribedTopicNames.forEach((topicName, count) -> {
+            TopicImage topicImage = topicsImage.getTopic(topicName);
+            if (topicImage != null) {
+                newSubscriptionMetadata.put(topicName, new TopicMetadata(
+                    topicImage.id(),
+                    topicImage.name(),
+                    topicImage.partitions().size()
+                ));
+            }
+        });
+
+        return Collections.unmodifiableMap(newSubscriptionMetadata);
+    }
+
+    /**
+     * Updates the current state of the group.
+     */
+    private void maybeUpdateGroupState() {
+        if (members.isEmpty()) {
+            state.set(ConsumerGroupState.EMPTY);
+        } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+            state.set(ConsumerGroupState.ASSIGNING);
+        } else {
+            for (ConsumerGroupMember member : members.values()) {
+                if (member.targetMemberEpoch() != targetAssignmentEpoch.get() || member.state() != ConsumerGroupMember.MemberState.STABLE) {
+                    state.set(ConsumerGroupState.RECONCILING);
+                    return;
+                }
+            }
+
+            state.set(ConsumerGroupState.STABLE);
+        }
+    }
+
+    /**
+     * Updates the server assignors count.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateServerAssignors(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        maybeUpdateServerAssignors(serverAssignors, oldMember, newMember);
+    }
+
+    /**
+     * Updates the server assignors count.
+     *
+     * @param serverAssignorCount   The count to update.
+     * @param oldMember             The old member.
+     * @param newMember             The new member.
+     */
+    private static void maybeUpdateServerAssignors(
+        Map<String, Integer> serverAssignorCount,
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        if (oldMember != null) {
+            oldMember.serverAssignorName().ifPresent(name ->
+                serverAssignorCount.compute(name, ConsumerGroup::decValue)
+            );
+        }
+        if (newMember != null) {
+            newMember.serverAssignorName().ifPresent(name ->
+                serverAssignorCount.compute(name, ConsumerGroup::incValue)
+            );
+        }
+    }
+
+    /**
+     * Updates the subscribed topic names count.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateSubscribedTopicNames(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember);
+    }
+
+    /**
+     * Updates the subscription count.
+     *
+     * @param subscribedTopicCount  The map to update.
+     * @param oldMember             The old member.
+     * @param newMember             The new member.
+     */
+    private static void maybeUpdateSubscribedTopicNames(
+        Map<String, Integer> subscribedTopicCount,
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        if (oldMember != null) {
+            oldMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, ConsumerGroup::decValue)
+            );
+        }
+
+        if (newMember != null) {
+            newMember.subscribedTopicNames().forEach(topicName ->
+                subscribedTopicCount.compute(topicName, ConsumerGroup::incValue)
+            );
+        }
+    }
+
+    /**
+     * Updates the partition epochs based on the old and the new member.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdatePartitionEpoch(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        if (oldMember == null) {
+            addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch());
+            addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch());
+        } else {
+            if (!oldMember.assignedPartitions().equals(newMember.assignedPartitions())) {
+                removePartitionEpochs(oldMember.assignedPartitions());
+                addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch());
+            }
+            if (!oldMember.partitionsPendingRevocation().equals(newMember.partitionsPendingRevocation())) {
+                removePartitionEpochs(oldMember.partitionsPendingRevocation());
+                addPartitionEpochs(newMember.partitionsPendingRevocation(), newMember.memberEpoch());
+            }
+        }
+    }
+
+    /**
+     * Removes the partition epochs for the provided member.
+     *
+     * @param oldMember The old member.
+     */
+    private void maybeRemovePartitionEpoch(
+        ConsumerGroupMember oldMember
+    ) {
+        if (oldMember != null) {
+            removePartitionEpochs(oldMember.assignedPartitions());
+            removePartitionEpochs(oldMember.partitionsPendingRevocation());
+        }
+    }
+
+    /**
+     * Removes the partition epochs based on the provided assignment.
+     *
+     * @param assignment    The assignment.
+     */
+    private void removePartitionEpochs(
+        Map<Uuid, Set<Integer>> assignment
+    ) {
+        assignment.forEach((topicId, assignedPartitions) -> {
+            currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+                if (partitionsOrNull != null) {
+                    assignedPartitions.forEach(partitionsOrNull::remove);
+                    if (partitionsOrNull.isEmpty()) {
+                        return null;
+                    } else {
+                        return partitionsOrNull;
+                    }
+                } else {
+                    return null;
+                }
+            });
+        });
+    }
+
+    /**
+     * Adds the partitions epoch based on the provided assignment.
+     *
+     * @param assignment    The assignment.
+     * @param epoch         The new epoch.
+     */
+    private void addPartitionEpochs(
+        Map<Uuid, Set<Integer>> assignment,
+        int epoch
+    ) {
+        assignment.forEach((topicId, assignedPartitions) -> {
+            currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
+                if (partitionsOrNull == null) {
+                    partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
+                }
+                for (Integer partitionId : assignedPartitions) {
+                    partitionsOrNull.put(partitionId, epoch);
+                }
+                return partitionsOrNull;
+            });
+        });
+    }
+
+    /**
+     * Decrements value by 1; returns null when reaching zero. This helper is
+     * meant to be used with Map#compute.
+     */
+    private static Integer decValue(String key, Integer value) {
+        if (value == null) return null;
+        return value == 1 ? null : value - 1;
+    }
+
+    /**
+     * Increments value by 1; This helper is meant to be used with Map#compute.
+     */
+    private static Integer incValue(String key, Integer value) {
+        return value == null ? 1 : value + 1;
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
index c40bb7c937c..729edf8d06b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
@@ -48,7 +48,7 @@ public class ConsumerGroupMember {
         private final String memberId;
         private int memberEpoch = 0;
         private int previousMemberEpoch = -1;
-        private int nextMemberEpoch = 0;
+        private int targetMemberEpoch = 0;
         private String instanceId = null;
         private String rackId = null;
         private int rebalanceTimeoutMs = -1;
@@ -72,7 +72,7 @@ public class ConsumerGroupMember {
             this.memberId = member.memberId;
             this.memberEpoch = member.memberEpoch;
             this.previousMemberEpoch = member.previousMemberEpoch;
-            this.nextMemberEpoch = member.nextMemberEpoch;
+            this.targetMemberEpoch = member.targetMemberEpoch;
             this.instanceId = member.instanceId;
             this.rackId = member.rackId;
             this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
@@ -97,8 +97,8 @@ public class ConsumerGroupMember {
             return this;
         }
 
-        public Builder setNextMemberEpoch(int nextMemberEpoch) {
-            this.nextMemberEpoch = nextMemberEpoch;
+        public Builder setTargetMemberEpoch(int targetMemberEpoch) {
+            this.targetMemberEpoch = targetMemberEpoch;
             return this;
         }
 
@@ -217,7 +217,7 @@ public class ConsumerGroupMember {
         public Builder updateWith(ConsumerGroupCurrentMemberAssignmentValue record) {
             setMemberEpoch(record.memberEpoch());
             setPreviousMemberEpoch(record.previousMemberEpoch());
-            setNextMemberEpoch(record.targetMemberEpoch());
+            setTargetMemberEpoch(record.targetMemberEpoch());
             setAssignedPartitions(assignmentFromTopicPartitions(record.assignedPartitions()));
             setPartitionsPendingRevocation(assignmentFromTopicPartitions(record.partitionsPendingRevocation()));
             setPartitionsPendingAssignment(assignmentFromTopicPartitions(record.partitionsPendingAssignment()));
@@ -246,7 +246,7 @@ public class ConsumerGroupMember {
                 memberId,
                 memberEpoch,
                 previousMemberEpoch,
-                nextMemberEpoch,
+                targetMemberEpoch,
                 instanceId,
                 rackId,
                 rebalanceTimeoutMs,
@@ -305,7 +305,7 @@ public class ConsumerGroupMember {
      * assignment epoch used to compute the current assigned,
      * revoking and assigning partitions.
      */
-    private final int nextMemberEpoch;
+    private final int targetMemberEpoch;
 
     /**
      * The instance id provided by the member.
@@ -378,7 +378,7 @@ public class ConsumerGroupMember {
         String memberId,
         int memberEpoch,
         int previousMemberEpoch,
-        int nextMemberEpoch,
+        int targetMemberEpoch,
         String instanceId,
         String rackId,
         int rebalanceTimeoutMs,
@@ -396,7 +396,7 @@ public class ConsumerGroupMember {
         this.memberId = memberId;
         this.memberEpoch = memberEpoch;
         this.previousMemberEpoch = previousMemberEpoch;
-        this.nextMemberEpoch = nextMemberEpoch;
+        this.targetMemberEpoch = targetMemberEpoch;
         this.instanceId = instanceId;
         this.rackId = rackId;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
@@ -434,10 +434,10 @@ public class ConsumerGroupMember {
     }
 
     /**
-     * @return The next member epoch.
+     * @return The target member epoch.
      */
-    public int nextMemberEpoch() {
-        return nextMemberEpoch;
+    public int targetMemberEpoch() {
+        return targetMemberEpoch;
     }
 
     /**
@@ -535,10 +535,9 @@ public class ConsumerGroupMember {
      * @return A string representation of the current assignment state.
      */
     public String currentAssignmentSummary() {
-        return "CurrentAssignment(" +
-            ", memberEpoch=" + memberEpoch +
+        return "CurrentAssignment(memberEpoch=" + memberEpoch +
             ", previousMemberEpoch=" + previousMemberEpoch +
-            ", nextMemberEpoch=" + nextMemberEpoch +
+            ", targetMemberEpoch=" + targetMemberEpoch +
             ", state=" + state +
             ", assignedPartitions=" + assignedPartitions +
             ", partitionsPendingRevocation=" + partitionsPendingRevocation +
@@ -553,7 +552,7 @@ public class ConsumerGroupMember {
         ConsumerGroupMember that = (ConsumerGroupMember) o;
         return memberEpoch == that.memberEpoch
             && previousMemberEpoch == that.previousMemberEpoch
-            && nextMemberEpoch == that.nextMemberEpoch
+            && targetMemberEpoch == that.targetMemberEpoch
             && rebalanceTimeoutMs == that.rebalanceTimeoutMs
             && Objects.equals(memberId, that.memberId)
             && Objects.equals(instanceId, that.instanceId)
@@ -574,7 +573,7 @@ public class ConsumerGroupMember {
         int result = memberId != null ? memberId.hashCode() : 0;
         result = 31 * result + memberEpoch;
         result = 31 * result + previousMemberEpoch;
-        result = 31 * result + nextMemberEpoch;
+        result = 31 * result + targetMemberEpoch;
         result = 31 * result + Objects.hashCode(instanceId);
         result = 31 * result + Objects.hashCode(rackId);
         result = 31 * result + rebalanceTimeoutMs;
@@ -596,7 +595,7 @@ public class ConsumerGroupMember {
             "memberId='" + memberId + '\'' +
             ", memberEpoch=" + memberEpoch +
             ", previousMemberEpoch=" + previousMemberEpoch +
-            ", nextMemberEpoch=" + nextMemberEpoch +
+            ", targetMemberEpoch=" + targetMemberEpoch +
             ", instanceId='" + instanceId + '\'' +
             ", rackId='" + rackId + '\'' +
             ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java
index 6a255ae8e53..fce5b8a85bd 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java
@@ -172,7 +172,7 @@ public class CurrentAssignmentBuilder {
     public ConsumerGroupMember build() {
         // A new target assignment has been installed, we need to restart
         // the reconciliation loop from the beginning.
-        if (targetAssignmentEpoch != member.nextMemberEpoch()) {
+        if (targetAssignmentEpoch != member.targetMemberEpoch()) {
             return transitionToNewTargetAssignmentState();
         }
 
@@ -258,7 +258,7 @@ public class CurrentAssignmentBuilder {
                 .setAssignedPartitions(newAssignedPartitions)
                 .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
                 .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
-                .setNextMemberEpoch(targetAssignmentEpoch)
+                .setTargetMemberEpoch(targetAssignmentEpoch)
                 .build();
         } else {
             if (!newPartitionsPendingAssignment.isEmpty()) {
@@ -277,7 +277,7 @@ public class CurrentAssignmentBuilder {
                 .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
                 .setPreviousMemberEpoch(member.memberEpoch())
                 .setMemberEpoch(targetAssignmentEpoch)
-                .setNextMemberEpoch(targetAssignmentEpoch)
+                .setTargetMemberEpoch(targetAssignmentEpoch)
                 .build();
         }
     }
@@ -311,7 +311,7 @@ public class CurrentAssignmentBuilder {
                 .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
                 .setPreviousMemberEpoch(member.memberEpoch())
                 .setMemberEpoch(targetAssignmentEpoch)
-                .setNextMemberEpoch(targetAssignmentEpoch)
+                .setTargetMemberEpoch(targetAssignmentEpoch)
                 .build();
         } else {
             return member;
@@ -340,7 +340,7 @@ public class CurrentAssignmentBuilder {
                 .setPartitionsPendingAssignment(newPartitionsPendingAssignment)
                 .setPreviousMemberEpoch(member.memberEpoch())
                 .setMemberEpoch(targetAssignmentEpoch)
-                .setNextMemberEpoch(targetAssignmentEpoch)
+                .setTargetMemberEpoch(targetAssignmentEpoch)
                 .build();
         } else {
             return member;
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
new file mode 100644
index 00000000000..9725a61aa4e
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -0,0 +1,2073 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupMetadataManagerTest {
+    static class MockPartitionAssignor implements PartitionAssignor {
+        private final String name;
+        private GroupAssignment prepareGroupAssignment = null;
+
+        MockPartitionAssignor(String name) {
+            this.name = name;
+        }
+
+        public void prepareGroupAssignment(GroupAssignment prepareGroupAssignment) {
+            this.prepareGroupAssignment = prepareGroupAssignment;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException {
+            return prepareGroupAssignment;
+        }
+    }
+
+    public static class TopicsImageBuilder {
+        private TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+
+        public TopicsImageBuilder addTopic(
+            Uuid topicId,
+            String topicName,
+            int numPartitions
+        ) {
+            delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName));
+            for (int i = 0; i < numPartitions; i++) {
+                delta.replay(new PartitionRecord()
+                    .setTopicId(topicId)
+                    .setPartitionId(i));
+            }
+            return this;
+        }
+
+        public TopicsImage build() {
+            return delta.apply();
+        }
+    }
+
+    static class ConsumerGroupBuilder {
+        private final String groupId;
+        private final int groupEpoch;
+        private int assignmentEpoch;
+        private final Map<String, ConsumerGroupMember> members = new HashMap<>();
+        private final Map<String, Assignment> assignments = new HashMap<>();
+
+        public ConsumerGroupBuilder(String groupId, int groupEpoch) {
+            this.groupId = groupId;
+            this.groupEpoch = groupEpoch;
+            this.assignmentEpoch = 0;
+        }
+
+        public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
+            this.members.put(member.memberId(), member);
+            return this;
+        }
+
+        public ConsumerGroupBuilder withAssignment(String memberId, Map<Uuid, Set<Integer>> assignment) {
+            this.assignments.put(memberId, new Assignment(assignment));
+            return this;
+        }
+
+        public ConsumerGroupBuilder withAssignmentEpoch(int assignmentEpoch) {
+            this.assignmentEpoch = assignmentEpoch;
+            return this;
+        }
+
+        public List<Record> build(TopicsImage topicsImage) {
+            List<Record> records = new ArrayList<>();
+
+            // Add subscription records for members.
+            members.forEach((memberId, member) -> {
+                records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, member));
+            });
+
+            // Add subscription metadata.
+            Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
+            members.forEach((memberId, member) -> {
+                member.subscribedTopicNames().forEach(topicName -> {
+                    TopicImage topicImage = topicsImage.getTopic(topicName);
+                    if (topicImage != null) {
+                        subscriptionMetadata.put(topicName, new TopicMetadata(
+                            topicImage.id(),
+                            topicImage.name(),
+                            topicImage.partitions().size()
+                        ));
+                    }
+                });
+            });
+
+            if (!subscriptionMetadata.isEmpty()) {
+                records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+            }
+
+            // Add group epoch record.
+            records.add(RecordHelpers.newGroupEpochRecord(groupId, groupEpoch));
+
+            // Add target assignment records.
+            assignments.forEach((memberId, assignment) -> {
+                records.add(RecordHelpers.newTargetAssignmentRecord(groupId, memberId, assignment.partitions()));
+            });
+
+            // Add target assignment epoch.
+            records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, assignmentEpoch));
+
+            // Add current assignment records for members.
+            members.forEach((memberId, member) -> {
+                records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, member));
+            });
+
+            return records;
+        }
+    }
+
+    static class GroupMetadataManagerTestContext {
+        static class Builder {
+            final private LogContext logContext = new LogContext();
+            final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+            private TopicsImage topicsImage;
+            private List<PartitionAssignor> assignors;
+            private List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>();
+            private int consumerGroupMaxSize = Integer.MAX_VALUE;
+
+            public Builder withTopicsImage(TopicsImage topicsImage) {
+                this.topicsImage = topicsImage;
+                return this;
+            }
+
+            public Builder withAssignors(List<PartitionAssignor> assignors) {
+                this.assignors = assignors;
+                return this;
+            }
+
+            public Builder withConsumerGroup(ConsumerGroupBuilder builder) {
+                this.consumerGroupBuilders.add(builder);
+                return this;
+            }
+
+            public Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+                this.consumerGroupMaxSize = consumerGroupMaxSize;
+                return this;
+            }
+
+            public GroupMetadataManagerTestContext build() {
+                if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+                if (assignors == null) assignors = Collections.emptyList();
+
+                GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
+                    snapshotRegistry,
+                    new GroupMetadataManager.Builder()
+                        .withSnapshotRegistry(snapshotRegistry)
+                        .withLogContext(logContext)
+                        .withTopicsImage(topicsImage)
+                        .withConsumerGroupHeartbeatInterval(5000)
+                        .withConsumerGroupMaxSize(consumerGroupMaxSize)
+                        .withAssignors(assignors)
+                        .build()
+                );
+
+                consumerGroupBuilders.forEach(builder -> {
+                    builder.build(topicsImage).forEach(context::replay);
+                });
+
+                context.commit();
+
+                return context;
+            }
+        }
+
+        final SnapshotRegistry snapshotRegistry;
+        final GroupMetadataManager groupMetadataManager;
+
+        long lastCommittedOffset = 0L;
+        long lastWrittenOffset = 0L;
+
+        public GroupMetadataManagerTestContext(
+            SnapshotRegistry snapshotRegistry,
+            GroupMetadataManager groupMetadataManager
+        ) {
+            this.snapshotRegistry = snapshotRegistry;
+            this.groupMetadataManager = groupMetadataManager;
+        }
+
+        public void commit() {
+            long lastCommittedOffset = this.lastCommittedOffset;
+            this.lastCommittedOffset = lastWrittenOffset;
+            snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
+        }
+
+        public void rollback() {
+            lastWrittenOffset = lastCommittedOffset;
+            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+        }
+
+        public ConsumerGroup.ConsumerGroupState consumerGroupState(
+            String groupId
+        ) {
+            return groupMetadataManager
+                .getOrMaybeCreateConsumerGroup(groupId, false)
+                .state();
+        }
+
+        public ConsumerGroupMember.MemberState consumerGroupMemberState(
+            String groupId,
+            String memberId
+        ) {
+            return groupMetadataManager
+                .getOrMaybeCreateConsumerGroup(groupId, false)
+                .getOrMaybeCreateMember(memberId, false)
+                .state();
+        }
+
+        public Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
+            ConsumerGroupHeartbeatRequestData request
+        ) {
+            snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+            RequestContext context = new RequestContext(
+                new RequestHeader(
+                    ApiKeys.CONSUMER_GROUP_HEARTBEAT,
+                    ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(),
+                    "client",
+                    0
+                ),
+                "1",
+                InetAddress.getLoopbackAddress(),
+                KafkaPrincipal.ANONYMOUS,
+                ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+                SecurityProtocol.PLAINTEXT,
+                ClientInformation.EMPTY,
+                false
+            );
+
+            Result<ConsumerGroupHeartbeatResponseData> result = groupMetadataManager.consumerGroupHeartbeat(
+                context,
+                request
+            );
+
+            result.records().forEach(this::replay);
+            return result;
+        }
+
+        private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
+            if (apiMessageAndVersion == null) {
+                return null;
+            } else {
+                return apiMessageAndVersion.message();
+            }
+        }
+
+        private void replay(
+            Record record
+        ) {
+            ApiMessageAndVersion key = record.key();
+            ApiMessageAndVersion value = record.value();
+
+            if (key == null) {
+                throw new IllegalStateException("Received a null key in " + record);
+            }
+
+            switch (key.version()) {
+                case ConsumerGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupMemberMetadataKey) key.message(),
+                        (ConsumerGroupMemberMetadataValue) messageOrNull(value)
+                    );
+                    break;
+
+                case ConsumerGroupMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupMetadataKey) key.message(),
+                        (ConsumerGroupMetadataValue) messageOrNull(value)
+                    );
+                    break;
+
+                case ConsumerGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupPartitionMetadataKey) key.message(),
+                        (ConsumerGroupPartitionMetadataValue) messageOrNull(value)
+                    );
+                    break;
+
+                case ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupTargetAssignmentMemberKey) key.message(),
+                        (ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value)
+                    );
+                    break;
+
+                case ConsumerGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupTargetAssignmentMetadataKey) key.message(),
+                        (ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value)
+                    );
+                    break;
+
+                case ConsumerGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION:
+                    groupMetadataManager.replay(
+                        (ConsumerGroupCurrentMemberAssignmentKey) key.message(),
+                        (ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value)
+                    );
+                    break;
+
+                default:
+                    throw new IllegalStateException("Received an unknown record type " + key.version()
+                        + " in " + record);
+            }
+
+            lastWrittenOffset++;
+        }
+    }
+
+    @Test
+    public void testConsumerHeartbeatRequestValidation() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        Exception ex;
+
+        // GroupId must be present in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()));
+        assertEquals("GroupId can't be empty.", ex.getMessage());
+
+        // RebalanceTimeoutMs must be present in the first request (epoch == 0).
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)));
+        assertEquals("RebalanceTimeoutMs must be provided in first request.", ex.getMessage());
+
+        // TopicPartitions must be present and empty in the first request (epoch == 0).
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)));
+        assertEquals("TopicPartitions must be empty when (re-)joining.", ex.getMessage());
+
+        // SubscribedTopicNames must be present and empty in the first request (epoch == 0).
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setTopicPartitions(Collections.emptyList())));
+        assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage());
+
+        // MemberId must be non-empty in all requests except for the first one where it
+        // could be empty (epoch != 0).
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(1)));
+        assertEquals("MemberId can't be empty.", ex.getMessage());
+
+        // InstanceId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(1)
+                .setInstanceId("")));
+        assertEquals("InstanceId can't be empty.", ex.getMessage());
+
+        // RackId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(1)
+                .setRackId("")));
+        assertEquals("RackId can't be empty.", ex.getMessage());
+
+        // ServerAssignor must exist if provided in all requests.
+        ex = assertThrows(UnsupportedAssignorException.class, () -> context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(1)
+                .setServerAssignor("bar")));
+        assertEquals("ServerAssignor bar is not supported. Supported assignors: range.", ex.getMessage());
+    }
+
+    @Test
+    public void testMemberIdGeneration() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(TopicsImage.EMPTY)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.emptyMap()
+        ));
+
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("group-foo")
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Verify that a member id was generated for the new member.
+        String memberId = result.response().memberId();
+        assertNotNull(memberId);
+        assertNotEquals("", memberId);
+
+        // The response should get a bumped epoch and should not
+        // contain any assignment because we did not provide
+        // topics metadata.
+        assertEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+    }
+
+    @Test
+    public void testUnknownGroupId() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(100) // Epoch must be > 0.
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testUnknownMemberIdJoinsConsumerGroup() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap()));
+
+        // A first member joins to create the group.
+        context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // The second member is rejected because the member id is unknown and
+        // the member epoch is not zero.
+        assertThrows(UnknownMemberIdException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(Uuid.randomUuid().toString())
+                    .setMemberEpoch(1)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupMemberEpochValidation() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setTargetMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2, 3)))
+            .build();
+
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, member));
+
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 100));
+
+        context.replay(RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2, 3)
+        )));
+
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 100));
+
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, member));
+
+        // Member epoch is greater than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(200)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member epoch is smaller than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(50)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch but without providing partitions.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(99)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch and has a subset of the owned partitions. This
+        // is accepted as the response with the bumped epoch may have been lost. In this
+        // case, we provide back the correct epoch to the member.
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(99)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Arrays.asList(1, 2)))));
+        assertEquals(100, result.response().memberEpoch());
+    }
+
+    @Test
+    public void testMemberJoinsEmptyConsumerGroup() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            )))
+        ));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false));
+
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(1)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {{
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
+                }}),
+            RecordHelpers.newGroupEpochRecord(groupId, 1),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 1),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testUpdatingSubscriptionTriggersNewTargetAssignment() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            )))
+        ));
+
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar")));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testNewJoiningMemberTriggersNewTargetAssignment() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 3 joins the consumer group.
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(4, 5)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(2))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, expectedMember3),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2));
+        assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7));
+    }
+
+    @Test
+    public void testLeavingMemberBumpsGroupEpoch() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two members.
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        Result<ConsumerGroupHeartbeatResponseData> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(-1)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-1),
+            result.response()
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2),
+            // Subscription metadata is recomputed because zar is no longer there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testReconciliationProcess() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        // Create a context with one consumer group containing two members.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Prepare new assignment for the group.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                }
+            }
+        ));
+
+        Result<ConsumerGroupHeartbeatResponseData> result;
+
+        // Members in the group are in Stable state.
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId1));
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId2));
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, context.consumerGroupState(groupId));
+
+        // Member 3 joins the group. This triggers the computation of a new target assignment
+        // for the group. Member 3 does not get any assigned partitions yet because they are
+        // all owned by other members. However, it transitions to epoch 11 / Assigning state.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(4, 5)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(1))
+                    ))),
+            result.response()
+        );
+
+        // We only check the last record as the subscription/target assignment updates are
+        // already covered by other tests.
+        assertRecordEquals(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(0)
+                .setTargetMemberEpoch(11)
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 4, 5),
+                    mkTopicAssignment(barTopicId, 1)))
+                .build()),
+            result.records().get(result.records().size() - 1)
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 1 heartbeats. It remains at epoch 10 but transitions to Revoking state until
+        // it acknowledges the revocation of its partitions. The response contains the new
+        // assignment without the partitions that must be revoked.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(10));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(0))
+                    ))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1),
+                    mkTopicAssignment(barTopicId, 0)))
+                .setPartitionsPendingRevocation(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2),
+                    mkTopicAssignment(barTopicId, 1)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.REVOKING, context.consumerGroupMemberState(groupId, memberId1));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 2 heartbeats. It remains at epoch 10 but transitions to Revoking state until
+        // it acknowledges the revocation of its partitions. The response contains the new
+        // assignment without the partitions that must be revoked.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(10));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(3)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(2))
+                    ))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3),
+                    mkTopicAssignment(barTopicId, 2)))
+                .setPartitionsPendingRevocation(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 4, 5)))
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.REVOKING, context.consumerGroupMemberState(groupId, memberId2));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 3 heartbeats. The response does not contain any assignment
+        // because the member is still waiting on other members to revoke partitions.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId3)
+            .setMemberEpoch(11));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000),
+            result.response()
+        );
+
+        assertEquals(Collections.emptyList(), result.records());
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 1 acknowledges the revocation of the partitions. It does so by providing the
+        // partitions that it still owns in the request. This allows him to transition to epoch 11
+        // and to the Stable state.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(10)
+            .setTopicPartitions(Arrays.asList(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Arrays.asList(0, 1)),
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(barTopicId)
+                    .setPartitions(Arrays.asList(0))
+            )));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(0))
+                    ))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1),
+                    mkTopicAssignment(barTopicId, 0)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId1));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 2 heartbeats but without acknowledging the revocation yet. This is basically a no-op.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(10));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000),
+            result.response()
+        );
+
+        assertEquals(Collections.emptyList(), result.records());
+        assertEquals(ConsumerGroupMember.MemberState.REVOKING, context.consumerGroupMemberState(groupId, memberId2));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 3 heartbeats. It receives the partitions revoked by member 1 but remains
+        // in Assigning state because it still waits on other partitions.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId3)
+            .setMemberEpoch(11));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(1))))
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(4, 5))))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(11)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(barTopicId, 1)))
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 4, 5)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 3 heartbeats. Member 2 has not acknowledged the revocation of its partition so
+        // member keeps its current assignment.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId3)
+            .setMemberEpoch(11));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000),
+            result.response()
+        );
+
+        assertEquals(Collections.emptyList(), result.records());
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 2 acknowledges the revocation of the partitions. It does so by providing the
+        // partitions that it still owns in the request. This allows him to transition to epoch 11
+        // and to the Stable state.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(10)
+            .setTopicPartitions(Arrays.asList(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Arrays.asList(3)),
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(barTopicId)
+                    .setPartitions(Arrays.asList(2))
+            )));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(2, 3)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(2))
+                    ))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2, 3),
+                    mkTopicAssignment(barTopicId, 2)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId2));
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        // Member 3 heartbeats. It receives all its partitions and transitions to Stable.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId3)
+            .setMemberEpoch(11));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(4, 5)),
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Arrays.asList(1))))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(11)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 4, 5),
+                    mkTopicAssignment(barTopicId, 1)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId3));
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, context.consumerGroupState(groupId));
+    }
+
+    @Test
+    public void testReconciliationRestartsWhenNewTargetAssignmentIsInstalled() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        Result<ConsumerGroupHeartbeatResponseData> result;
+
+        // Prepare new assignment for the group.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 joins.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(2))
+                    ))),
+            result.response()
+        );
+
+        assertRecordEquals(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2)
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(0)
+                .setTargetMemberEpoch(11)
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2)))
+                .build()),
+            result.records().get(result.records().size() - 1)
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId2));
+
+        // Member 1 heartbeats and transitions to Revoking.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(10));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1))))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setTargetMemberEpoch(11)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .setPartitionsPendingRevocation(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.REVOKING, context.consumerGroupMemberState(groupId, memberId1));
+
+        // Prepare new assignment for the group.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 1)
+                    )));
+                }
+            }
+        ));
+
+        // Member 3 joins.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(12)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(1))
+                    ))),
+            result.response()
+        );
+
+        assertRecordEquals(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3)
+                .setMemberEpoch(12)
+                .setPreviousMemberEpoch(0)
+                .setTargetMemberEpoch(12)
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 1)))
+                .build()),
+            result.records().get(result.records().size() - 1)
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId3));
+
+        // When member 1 heartbeats, it transitions to Revoke again but an updated state.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(10));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0))))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+                RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(12)
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0)))
+                    .setPartitionsPendingRevocation(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 1, 2)))
+                    .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.REVOKING, context.consumerGroupMemberState(groupId, memberId1));
+
+        // When member 2 heartbeats, it transitions to Assign again but with an updated state.
+        result = context.consumerGroupHeartbeat(new ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(11));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(12)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(2))))),
+            result.response()
+        );
+
+        assertRecordsEquals(Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2)
+                .setMemberEpoch(12)
+                .setPreviousMemberEpoch(11)
+                .setTargetMemberEpoch(12)
+                .setPartitionsPendingAssignment(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2)))
+                .build())),
+            result.records()
+        );
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, context.consumerGroupMemberState(groupId, memberId2));
+    }
+
+    @Test
+    public void testNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        // Create a context with one consumer group containing two members.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConsumerGroupMaxSize(2)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(GroupMaxSizeReachedException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId3)
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupStates() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+            .build();
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, context.consumerGroupState(groupId));
+
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .build()));
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 11));
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId));
+
+        context.replay(RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2, 3))));
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11));
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2)))
+            .setPartitionsPendingAssignment(mkAssignment(mkTopicAssignment(fooTopicId, 3)))
+            .build()));
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, context.consumerGroupState(groupId));
+
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2, 3)))
+            .build()));
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, context.consumerGroupState(groupId));
+    }
+
+    @Test
+    public void testPartitionAssignorExceptionOnRegularHeartbeat() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        PartitionAssignor assignor = mock(PartitionAssignor.class);
+        when(assignor.name()).thenReturn("range");
+        when(assignor.assign(any())).thenThrow(new PartitionAssignorException("Assignment failed."));
+
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withTopicsImage(new TopicsImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .build();
+
+        // Member 1 joins the consumer group. The request fails because the
+        // target assignment computation failed.
+        assertThrows(UnknownServerException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId1)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignor("range")
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    private <T> void assertUnorderedListEquals(
+        List<T> expected,
+        List<T> actual
+    ) {
+        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+    }
+
+    private void assertResponseEquals(
+        ConsumerGroupHeartbeatResponseData expected,
+        ConsumerGroupHeartbeatResponseData actual
+    ) {
+        if (!responseEquals(expected, actual)) {
+            assertionFailure()
+                .expected(expected)
+                .actual(actual)
+                .buildAndThrow();
+        }
+    }
+
+    private boolean responseEquals(
+        ConsumerGroupHeartbeatResponseData expected,
+        ConsumerGroupHeartbeatResponseData actual
+    ) {
+        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
+        if (expected.errorCode() != actual.errorCode()) return false;
+        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false;
+        if (!Objects.equals(expected.memberId(), actual.memberId())) return false;
+        if (expected.memberEpoch() != actual.memberEpoch()) return false;
+        if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false;
+        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false;
+        // Unordered comparison of the assignments.
+        return responseAssignmentEquals(expected.assignment(), actual.assignment());
+    }
+
+    private boolean responseAssignmentEquals(
+        ConsumerGroupHeartbeatResponseData.Assignment expected,
+        ConsumerGroupHeartbeatResponseData.Assignment actual
+    ) {
+        if (expected == actual) return true;
+        if (expected == null) return false;
+        if (actual == null) return false;
+
+        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions())))
+            return false;
+
+        return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions()));
+    }
+
+    private Map<Uuid, Set<Integer>> fromAssignment(
+        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
+    ) {
+        if (assignment == null) return null;
+
+        Map<Uuid, Set<Integer>> assigmentMap = new HashMap<>();
+        assignment.forEach(topicPartitions -> {
+            assigmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()));
+        });
+        return assigmentMap;
+    }
+
+    private void assertRecordsEquals(
+        List<Record> expectedRecords,
+        List<Record> actualRecords
+    ) {
+        try {
+            assertEquals(expectedRecords.size(), actualRecords.size());
+
+            for (int i = 0; i < expectedRecords.size(); i++) {
+                Record expectedRecord = expectedRecords.get(i);
+                Record actualRecord = actualRecords.get(i);
+                assertRecordEquals(expectedRecord, actualRecord);
+            }
+        } catch (AssertionFailedError e) {
+            assertionFailure()
+                .expected(expectedRecords)
+                .actual(actualRecords)
+                .buildAndThrow();
+        }
+    }
+
+    private void assertRecordEquals(
+        Record expected,
+        Record actual
+    ) {
+        try {
+            assertApiMessageAndVersionEquals(expected.key(), actual.key());
+            assertApiMessageAndVersionEquals(expected.value(), actual.value());
+        } catch (AssertionFailedError e) {
+            assertionFailure()
+                .expected(expected)
+                .actual(actual)
+                .buildAndThrow();
+        }
+    }
+
+    private void assertApiMessageAndVersionEquals(
+        ApiMessageAndVersion expected,
+        ApiMessageAndVersion actual
+    ) {
+        if (expected == actual) return;
+
+        assertEquals(expected.version(), actual.version());
+
+        if (actual.message() instanceof ConsumerGroupCurrentMemberAssignmentValue) {
+            // The order of the topics stored in ConsumerGroupCurrentMemberAssignmentValue is not
+            // always guaranteed. Therefore, we need a special comparator.
+            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
+                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
+            ConsumerGroupCurrentMemberAssignmentValue actualValue =
+                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
+
+            assertEquals(expectedValue.memberEpoch(), actualValue.memberEpoch());
+            assertEquals(expectedValue.previousMemberEpoch(), actualValue.previousMemberEpoch());
+            assertEquals(expectedValue.targetMemberEpoch(), actualValue.targetMemberEpoch());
+            assertEquals(expectedValue.error(), actualValue.error());
+            assertEquals(expectedValue.metadataVersion(), actualValue.metadataVersion());
+            assertEquals(expectedValue.metadataBytes(), actualValue.metadataBytes());
+
+            // We transform those to Maps before comparing them.
+            assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
+                fromTopicPartitions(actualValue.assignedPartitions()));
+            assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
+                fromTopicPartitions(actualValue.partitionsPendingRevocation()));
+            assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
+                fromTopicPartitions(actualValue.partitionsPendingAssignment()));
+        } else {
+            assertEquals(expected.message(), actual.message());
+        }
+    }
+
+    private Map<Uuid, Set<Integer>> fromTopicPartitions(
+        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> assignment
+    ) {
+        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
+        assignment.forEach(topicPartitions -> {
+            assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions()));
+        });
+        return assignmentMap;
+    }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
index 40b6ddaedcc..cfa2d600f7c 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java
@@ -378,7 +378,7 @@ public class RecordHelpersTest {
             new ConsumerGroupMember.Builder("member-id")
                 .setMemberEpoch(22)
                 .setPreviousMemberEpoch(21)
-                .setNextMemberEpoch(23)
+                .setTargetMemberEpoch(23)
                 .setAssignedPartitions(assigned)
                 .setPartitionsPendingRevocation(revoking)
                 .setPartitionsPendingAssignment(assigning)
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
index e98a895d2be..13ac57bb2b0 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
@@ -42,7 +42,7 @@ public class ConsumerGroupMemberTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(9)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setInstanceId("instance-id")
             .setRackId("rack-id")
             .setRebalanceTimeoutMs(5000)
@@ -71,7 +71,7 @@ public class ConsumerGroupMemberTest {
         assertEquals("member-id", member.memberId());
         assertEquals(10, member.memberEpoch());
         assertEquals(9, member.previousMemberEpoch());
-        assertEquals(11, member.nextMemberEpoch());
+        assertEquals(11, member.targetMemberEpoch());
         assertEquals("instance-id", member.instanceId());
         assertEquals("rack-id", member.rackId());
         assertEquals("client-id", member.clientId());
@@ -105,7 +105,7 @@ public class ConsumerGroupMemberTest {
         ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member-id")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(9)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setInstanceId("instance-id")
             .setRackId("rack-id")
             .setRebalanceTimeoutMs(5000)
@@ -134,7 +134,7 @@ public class ConsumerGroupMemberTest {
         ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-id")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(9)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setInstanceId("instance-id")
             .setRackId("rack-id")
             .setRebalanceTimeoutMs(5000)
@@ -172,7 +172,7 @@ public class ConsumerGroupMemberTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(9)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setInstanceId("instance-id")
             .setRackId("rack-id")
             .setRebalanceTimeoutMs(5000)
@@ -299,7 +299,7 @@ public class ConsumerGroupMemberTest {
 
         assertEquals(10, member.memberEpoch());
         assertEquals(9, member.previousMemberEpoch());
-        assertEquals(11, member.nextMemberEpoch());
+        assertEquals(11, member.targetMemberEpoch());
         assertEquals(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 2)), member.assignedPartitions());
         assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation());
         assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), member.partitionsPendingAssignment());
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
new file mode 100644
index 00000000000..2454188ed94
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+    private ConsumerGroup createConsumerGroup(String groupId) {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        return new ConsumerGroup(snapshotRegistry, groupId);
+    }
+
+    @Test
+    public void testGetOrCreateMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        ConsumerGroupMember member;
+
+        // Create a group.
+        member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+        assertEquals("member-id", member.memberId());
+
+        // Get that group back.
+        member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+        assertEquals("member-id", member.memberId());
+
+        assertThrows(UnknownMemberIdException.class, () ->
+            consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+    }
+
+    @Test
+    public void testUpdateMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        ConsumerGroupMember member;
+
+        member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+        member = new ConsumerGroupMember.Builder(member)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false));
+    }
+
+    @Test
+    public void testRemoveMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        consumerGroup.getOrMaybeCreateMember("member", true);
+        assertTrue(consumerGroup.hasMember("member"));
+
+        consumerGroup.removeMember("member");
+        assertFalse(consumerGroup.hasMember("member"));
+
+    }
+
+    @Test
+    public void testUpdatingMemberUpdatesPartitionEpoch() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        Uuid zarTopicId = Uuid.randomUuid();
+
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        ConsumerGroupMember member;
+
+        member = new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(barTopicId, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(zarTopicId, 7, 8, 9)))
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 4));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 5));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 6));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 7));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 8));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 9));
+
+        member = new ConsumerGroupMember.Builder(member)
+            .setMemberEpoch(11)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(barTopicId, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(zarTopicId, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 7, 8, 9)))
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        assertEquals(11, consumerGroup.currentPartitionEpoch(barTopicId, 1));
+        assertEquals(11, consumerGroup.currentPartitionEpoch(barTopicId, 2));
+        assertEquals(11, consumerGroup.currentPartitionEpoch(barTopicId, 3));
+        assertEquals(11, consumerGroup.currentPartitionEpoch(zarTopicId, 4));
+        assertEquals(11, consumerGroup.currentPartitionEpoch(zarTopicId, 5));
+        assertEquals(11, consumerGroup.currentPartitionEpoch(zarTopicId, 6));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 7));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 8));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 9));
+    }
+
+    @Test
+    public void testDeletingMemberRemovesPartitionEpoch() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        Uuid zarTopicId = Uuid.randomUuid();
+
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        ConsumerGroupMember member;
+
+        member = new ConsumerGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(barTopicId, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(zarTopicId, 7, 8, 9)))
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 4));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 5));
+        assertEquals(10, consumerGroup.currentPartitionEpoch(barTopicId, 6));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 7));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 8));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 9));
+
+        consumerGroup.removeMember(member.memberId());
+
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(barTopicId, 1));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(barTopicId, 2));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(barTopicId, 3));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 4));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 5));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(zarTopicId, 6));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 7));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 8));
+        assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 9));
+    }
+
+    @Test
+    public void testGroupState() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
+
+        ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(1)
+            .build();
+
+        consumerGroup.updateMember(member1);
+        consumerGroup.setGroupEpoch(1);
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, member1.state());
+        assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state());
+
+        ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(1)
+            .build();
+
+        consumerGroup.updateMember(member2);
+        consumerGroup.setGroupEpoch(2);
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, member2.state());
+        assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state());
+
+        consumerGroup.setTargetAssignmentEpoch(2);
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state());
+
+        member1 = new ConsumerGroupMember.Builder(member1)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .setTargetMemberEpoch(2)
+            .build();
+
+        consumerGroup.updateMember(member1);
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, member1.state());
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state());
+
+        // Member 2 is not stable so the group stays in reconciling state.
+        member2 = new ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .setTargetMemberEpoch(2)
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)))
+            .build();
+
+        consumerGroup.updateMember(member2);
+
+        assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member2.state());
+        assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state());
+
+        member2 = new ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .setTargetMemberEpoch(2)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)))
+            .setPartitionsPendingAssignment(Collections.emptyMap())
+            .build();
+
+        consumerGroup.updateMember(member2);
+
+        assertEquals(ConsumerGroupMember.MemberState.STABLE, member2.state());
+        assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state());
+
+        consumerGroup.removeMember("member1");
+        consumerGroup.removeMember("member2");
+
+        assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
+    }
+
+    @Test
+    public void testPreferredServerAssignor() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
+            .setServerAssignorName("range")
+            .build();
+        ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2")
+            .setServerAssignorName("range")
+            .build();
+        ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3")
+            .setServerAssignorName("uniform")
+            .build();
+
+        // The group is empty so the preferred assignor should be empty.
+        assertEquals(
+            Optional.empty(),
+            consumerGroup.preferredServerAssignor()
+        );
+
+        // Member 1 has got an updated assignor but this is not reflected in the group yet so
+        // we pass the updated member. The assignor should be range.
+        assertEquals(
+            Optional.of("range"),
+            consumerGroup.computePreferredServerAssignor(null, member1)
+        );
+
+        // Update the group with member 1.
+        consumerGroup.updateMember(member1);
+
+        // Member 1 is in the group so the assignor should be range.
+        assertEquals(
+            Optional.of("range"),
+            consumerGroup.preferredServerAssignor()
+        );
+
+        // Member 1 has been removed but this is not reflected in the group yet so
+        // we pass the removed member. The assignor should be range.
+        assertEquals(
+            Optional.empty(),
+            consumerGroup.computePreferredServerAssignor(member1, null)
+        );
+
+        // Member 2 has got an updated assignor but this is not reflected in the group yet so
+        // we pass the updated member. The assignor should be range.
+        assertEquals(
+            Optional.of("range"),
+            consumerGroup.computePreferredServerAssignor(null, member2)
+        );
+
+        // Update the group with member 2.
+        consumerGroup.updateMember(member2);
+
+        // Member 1 and 2 are in the group so the assignor should be range.
+        assertEquals(
+            Optional.of("range"),
+            consumerGroup.preferredServerAssignor()
+        );
+
+        // Update the group with member 3.
+        consumerGroup.updateMember(member3);
+
+        // Member 1, 2 and 3 are in the group so the assignor should be range.
+        assertEquals(
+            Optional.of("range"),
+            consumerGroup.preferredServerAssignor()
+        );
+
+        // Members without assignors
+        ConsumerGroupMember updatedMember1 = new ConsumerGroupMember.Builder("member1")
+            .setServerAssignorName(null)
+            .build();
+        ConsumerGroupMember updatedMember2 = new ConsumerGroupMember.Builder("member2")
+            .setServerAssignorName(null)
+            .build();
+        ConsumerGroupMember updatedMember3 = new ConsumerGroupMember.Builder("member3")
+            .setServerAssignorName(null)
+            .build();
+
+        // Member 1 has removed it assignor but this is not reflected in the group yet so
+        // we pass the updated member. The assignor should be range or uniform.
+        Optional<String> assignor = consumerGroup.computePreferredServerAssignor(member1, updatedMember1);
+        assertTrue(assignor.equals(Optional.of("range")) || assignor.equals(Optional.of("uniform")));
+
+        // Update the group.
+        consumerGroup.updateMember(updatedMember1);
+
+        // Member 2 has removed it assignor but this is not reflected in the group yet so
+        // we pass the updated member. The assignor should be range or uniform.
+        assertEquals(
+            Optional.of("uniform"),
+            consumerGroup.computePreferredServerAssignor(member2, updatedMember2)
+        );
+
+        // Update the group.
+        consumerGroup.updateMember(updatedMember2);
+
+        // Only member 3 is left in the group so the assignor should be uniform.
+        assertEquals(
+            Optional.of("uniform"),
+            consumerGroup.preferredServerAssignor()
+        );
+
+        // Member 3 has removed it assignor but this is not reflected in the group yet so
+        // we pass the updated member. The assignor should be empty.
+        assertEquals(
+            Optional.empty(),
+            consumerGroup.computePreferredServerAssignor(member3, updatedMember3)
+        );
+
+        // Update the group.
+        consumerGroup.updateMember(updatedMember3);
+
+        // The group is empty so the assignor should be empty as well.
+        assertEquals(
+            Optional.empty(),
+            consumerGroup.preferredServerAssignor()
+        );
+    }
+
+    @Test
+    public void testUpdateSubscriptionMetadata() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        Uuid barTopicId = Uuid.randomUuid();
+        Uuid zarTopicId = Uuid.randomUuid();
+
+        TopicsImage image = new GroupMetadataManagerTest.TopicsImageBuilder()
+            .addTopic(fooTopicId, "foo", 1)
+            .addTopic(barTopicId, "bar", 2)
+            .addTopic(zarTopicId, "zar", 3)
+            .build();
+
+        ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Arrays.asList("foo"))
+            .build();
+        ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member2")
+            .setSubscribedTopicNames(Arrays.asList("bar"))
+            .build();
+        ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member3")
+            .setSubscribedTopicNames(Arrays.asList("zar"))
+            .build();
+
+        ConsumerGroup consumerGroup = createConsumerGroup("group-foo");
+
+        // It should be empty by default.
+        assertEquals(
+            Collections.emptyMap(),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                null,
+                image
+            )
+        );
+
+        // Compute while taking into account member 1.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                member1,
+                image
+            )
+        );
+
+        // Updating the group with member1.
+        consumerGroup.updateMember(member1);
+
+        // It should return foo now.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                null,
+                image
+            )
+        );
+
+        // Compute while taking into account removal of member 1.
+        assertEquals(
+            Collections.emptyMap(),
+            consumerGroup.computeSubscriptionMetadata(
+                member1,
+                null,
+                image
+            )
+        );
+
+        // Compute while taking into account member 2.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                member2,
+                image
+            )
+        );
+
+        // Updating the group with member2.
+        consumerGroup.updateMember(member2);
+
+        // It should return foo and bar.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                null,
+                image
+            )
+        );
+
+        // Compute while taking into account removal of member 2.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                member2,
+                null,
+                image
+            )
+        );
+
+        // Removing member1 results in returning bar.
+        assertEquals(
+            mkMap(
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                member1,
+                null,
+                image
+            )
+        );
+
+        // Compute while taking into account member 3.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                member3,
+                image
+            )
+        );
+
+        // Updating group with member3.
+        consumerGroup.updateMember(member3);
+
+        // It should return foo, bar and zar.
+        assertEquals(
+            mkMap(
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
+            ),
+            consumerGroup.computeSubscriptionMetadata(
+                null,
+                null,
+                image
+            )
+        );
+    }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java
index bbe5cc5e096..037a6ccbcd4 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java
@@ -44,7 +44,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(10)
+            .setTargetMemberEpoch(10)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 1, 2, 3),
                 mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -65,7 +65,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(10, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3),
             mkTopicAssignment(topicId2, 6)
@@ -88,7 +88,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(10)
+            .setTargetMemberEpoch(10)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 1, 2, 3),
                 mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -109,7 +109,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 1, 2, 3),
             mkTopicAssignment(topicId2, 4, 5, 6)
@@ -129,7 +129,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(10)
+            .setTargetMemberEpoch(10)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 1, 2, 3),
                 mkTopicAssignment(topicId2, 4, 5, 6)))
@@ -150,7 +150,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 1, 2, 3),
             mkTopicAssignment(topicId2, 4, 5, 6)
@@ -179,7 +179,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -207,7 +207,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(10, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3),
             mkTopicAssignment(topicId2, 6)
@@ -230,7 +230,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -260,7 +260,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3),
             mkTopicAssignment(topicId2, 6)
@@ -280,7 +280,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -310,7 +310,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3, 4, 5),
             mkTopicAssignment(topicId2, 6, 7, 8)
@@ -327,7 +327,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -358,7 +358,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(12, updatedMember.memberEpoch());
-        assertEquals(12, updatedMember.nextMemberEpoch());
+        assertEquals(12, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 1, 2, 3),
             mkTopicAssignment(topicId2, 4, 5, 6)
@@ -375,7 +375,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(11)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -404,7 +404,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3, 4, 5),
             mkTopicAssignment(topicId2, 6)
@@ -423,7 +423,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(11)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -447,7 +447,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3, 4, 5),
             mkTopicAssignment(topicId2, 6, 7, 8)
@@ -464,7 +464,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(11)
             .setPreviousMemberEpoch(11)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3, 4, 5),
                 mkTopicAssignment(topicId2, 6, 7, 8)))
@@ -485,7 +485,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state());
         assertEquals(11, updatedMember.previousMemberEpoch());
         assertEquals(11, updatedMember.memberEpoch());
-        assertEquals(11, updatedMember.nextMemberEpoch());
+        assertEquals(11, updatedMember.targetMemberEpoch());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 3, 4, 5),
             mkTopicAssignment(topicId2, 6, 7, 8)
@@ -502,7 +502,7 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setMemberEpoch(10)
             .setPreviousMemberEpoch(10)
-            .setNextMemberEpoch(11)
+            .setTargetMemberEpoch(11)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(topicId1, 3),
                 mkTopicAssignment(topicId2, 6)))
@@ -529,7 +529,7 @@ public class CurrentAssignmentBuilderTest {
         assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state());
         assertEquals(10, updatedMember.previousMemberEpoch());
         assertEquals(10, updatedMember.memberEpoch());
-        assertEquals(12, updatedMember.nextMemberEpoch());
+        assertEquals(12, updatedMember.targetMemberEpoch());
         assertEquals(Collections.emptyMap(), updatedMember.assignedPartitions());
         assertEquals(mkAssignment(
             mkTopicAssignment(topicId1, 1, 2, 3),