You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/04/10 21:44:45 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

jeffkbkim commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1161913599


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:

Review Comment:
   what's the soft state and how is it mutated?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The list of supported assignors.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The generic and consumer groups keyed by their subscribed topics.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName;

Review Comment:
   i don't see this being used right now. Should we update this once we accept a heartbeat request with `SubscribedTopicNames`?
   
   i assume this will be used to listen to trigger new assignment computation when there are changes to a topic's metadata?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+    public enum ConsumerGroupState {
+        EMPTY("empty"),
+        ASSIGNING("assigning"),
+        RECONCILING("reconciling"),
+        STABLE("stable"),
+        DEAD("dead");
+
+        private final String name;
+
+        ConsumerGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<ConsumerGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the subscriptions
+     * are updated and it will trigger the computation of a new assignment
+     * for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, ConsumerGroupMember> members;
+
+    /**
+     * The metadata of the subscribed topics.
+     */
+    private final TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata;
+
+    /**
+     * The assignment epoch. An assignment epoch smaller than the group epoch means
+     * that a new assignment is required. The assignment epoch is updated when a new
+     * assignment is installed.
+     */
+    private final TimelineInteger assignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private final TimelineHashMap<String, ConsumerGroupMemberAssignment> assignments;
+
+    /**
+     * The current partition epoch maps each topic-partitions to their current epoch where
+     * the epoch is the epoch of their owners. When a member revokes a partition, it removes
+     * itself from this map. When a member gets a partition, it adds itself to this map.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
+
+    public ConsumerGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId
+    ) {
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * The type of this group.
+     *
+     * @return The group type (Consumer).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.CONSUMER;
+    }
+
+    /**
+     * The state of this group.
+     *
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * The group id.
+     *
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * The state of this group.
+     *
+     * @return The current state.
+     */
+    public ConsumerGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * Returns the current group epoch.
+     *
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Returns the current assignment epoch.
+     *
+     * @return The current assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return assignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param assignmentEpoch The new assignment epoch.
+     */
+    public void setAssignmentEpoch(int assignmentEpoch) {
+        this.assignmentEpoch.set(assignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Gets or creates a member.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member = members.get(memberId);
+        if (member == null) {
+            if (!createIfNotExists) {
+                throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
+                    memberId, groupId));
+            }
+            member = new ConsumerGroupMember.Builder(memberId).build();
+            members.put(memberId, member);
+        }
+
+        return member;
+    }
+
+    /**
+     * Updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(ConsumerGroupMember newMember) {
+        ConsumerGroupMember oldMember = members.put(newMember.memberId(), newMember);
+        maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        ConsumerGroupMember member = members.remove(memberId);
+        maybeRemovePartitionEpoch(member);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     *
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * Returns the number of members in the group.
+     *
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * Returns the members keyed by their id.
+     *
+     * @return A immutable Map containing all the members.
+     */
+    public Map<String, ConsumerGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * Returns the current target assignment of the member.
+     *
+     * @return The ConsumerGroupMemberAssignment or an EMPTY one if it does not
+     *         exist.
+     */
+    public ConsumerGroupMemberAssignment targetAssignment(String memberId) {
+        return assignments.getOrDefault(memberId, ConsumerGroupMemberAssignment.EMPTY);
+    }
+
+    /**
+     * Updates target assignment of a member.
+     *
+     * @param memberId              The member id.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, ConsumerGroupMemberAssignment newTargetAssignment) {
+        assignments.put(memberId, newTargetAssignment);
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        assignments.remove(memberId);
+    }
+
+    /**
+     * Returns the target assignments for the entire group.
+     *
+     * @return A immutable Map containing all the target assignments.
+     */
+    public Map<String, ConsumerGroupMemberAssignment> targetAssignments() {
+        return Collections.unmodifiableMap(assignments);
+    }
+
+    /**
+     * Returns the current epoch of a partition or -1 if the partition
+     * does not have one.
+     *
+     * @param topicId       The topic id.
+     * @param partitionId   The partition id.
+     *
+     * @return The epoch or -1.
+     */
+    public int currentPartitionEpoch(
+        Uuid topicId, int partitionId
+    ) {
+        Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
+        if (partitions == null) {
+            return -1;
+        } else {
+            return partitions.getOrDefault(partitionId, -1);
+        }
+    }
+
+    /**
+     * Compute the preferred (server side) assignor for the group while
+     * using the provided assignor for the member.
+     *
+     * @param updatedMemberId       The member id.
+     * @param serverAssignorNameOpt The assignor name.
+     *
+     * @return An Optional containing the preferred assignor.
+     */
+    public Optional<String> preferredServerAssignor(
+        String updatedMemberId,
+        Optional<String> serverAssignorNameOpt
+    ) {
+        Map<String, Integer> counts = new HashMap<>();
+
+        serverAssignorNameOpt.ifPresent(serverAssignorName ->
+            counts.put(serverAssignorName, 1)
+        );
+
+        members.forEach((memberId, member) -> {
+            if (!memberId.equals(updatedMemberId) && member.serverAssignorName().isPresent()) {
+                counts.compute(member.serverAssignorName().get(), (k, v) -> v == null ? 1 : v + 1);
+            }
+        });
+
+        return counts.entrySet().stream()
+            .max(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey);
+    }
+
+    /**
+     * Returns the subscription metadata for all the topics whose
+     * members are subscribed to.
+     *
+     * @return An immutable Map containing the subscription metadata.
+     */
+    public Map<String, TopicMetadata> subscriptionMetadata() {
+        return Collections.unmodifiableMap(subscribedTopicMetadata);
+    }
+
+    /**
+     * Updates the subscription metadata. This replace the previous one.
+     *
+     * @param subscriptionMetadata The new subscription metadata.
+     */
+    public void setSubscriptionMetadata(
+        Map<String, TopicMetadata> subscriptionMetadata
+    ) {
+        this.subscribedTopicMetadata.clear();
+        this.subscribedTopicMetadata.putAll(subscriptionMetadata);
+    }
+
+    /**
+     * Computes new subscription metadata but with specific information for
+     * a member.
+     *
+     * @param updatedMemberId               The member id.
+     * @param updatedMemberSubscriptions    The member's updated topic subscriptions.
+     * @param topicsImage                   The topics metadata.
+     *
+     * @return The new subscription metadata as an immutable Map.
+     */
+    public Map<String, TopicMetadata> computeSubscriptionMetadata(
+        String updatedMemberId,

Review Comment:
   i realize it should be read as updated member's id but initially i read it as the updated memberId. can we just use memberId?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The list of supported assignors.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The generic and consumer groups keyed by their subscribed topics.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName;
+
+    /**
+     * The maximum number of members allowed in a single consumer group.
+     */
+    private final int consumerGroupMaxSize;
+
+    /**
+     * The heartbeat interval for consumer groups.
+     */
+    private final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The topics metadata (or image).
+     */
+    private TopicsImage topicsImage;
+
+    private GroupMetadataManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        List<PartitionAssignor> assignors,
+        TopicsImage topicsImage,
+        int consumerGroupMaxSize,
+        int consumerGroupHeartbeatIntervalMs
+    ) {
+        this.log = logContext.logger(GroupMetadataManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.topicsImage = topicsImage;
+        this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
+        this.defaultAssignor = assignors.get(0);
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.groupsByTopicName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * Gets or maybe creates a consumer group.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     */
+    // Package private for testing.
+    ConsumerGroup getOrMaybeCreateConsumerGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
+        }
+
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
+            groups.put(groupId, consumerGroup);
+            return consumerGroup;
+        } else {
+            if (group.type() == Group.GroupType.CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Removes the group.
+     *
+     * @param groupId The group id.
+     */
+    private void removeGroup(
+        String groupId
+    ) {
+        groups.remove(groupId);
+    }
+
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
+        ConsumerGroupHeartbeatRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        if (request.groupId().isEmpty()) {
+            throw new InvalidRequestException("GroupId can't be empty.");
+        }
+
+        if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+            if (request.memberId().isEmpty()) {
+                throw new InvalidRequestException("MemberId can't be empty.");
+            }
+            if (request.instanceId() != null) {
+                throw new InvalidRequestException("InstanceId should only be provided in first request.");
+            }
+            if (request.rackId() != null) {
+                throw new InvalidRequestException("RackId should only be provided in first request.");
+            }
+        } else if (request.memberEpoch() == 0) {
+            if (request.rebalanceTimeoutMs() == -1) {
+                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
+            }
+            if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
+                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
+            }
+            if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
+                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
+            }
+            if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
+                throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+                    + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+                    + ".");
+            }
+        } else {
+            throw new InvalidRequestException("MemberEpoch is invalid.");
+        }
+
+        if (request.subscribedTopicRegex() != null) {
+            throw new InvalidRequestException("SubscribedTopicRegex is not supported yet.");
+        }
+
+        if (request.clientAssignors() != null) {
+            // TODO We need to remove them from the request.
+            throw new InvalidRequestException("Client side assignors are not supported yet.");
+        }
+    }
+
+    /**
+     * Verifies that the partitions currently owned by the member (the ones set in the
+     * request) matches the ones that the member should own. It matches if the client
+     * has a least of subset of them.

Review Comment:
   nit: has at least



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The list of supported assignors.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The generic and consumer groups keyed by their subscribed topics.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName;
+
+    /**
+     * The maximum number of members allowed in a single consumer group.
+     */
+    private final int consumerGroupMaxSize;
+
+    /**
+     * The heartbeat interval for consumer groups.
+     */
+    private final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The topics metadata (or image).
+     */
+    private TopicsImage topicsImage;
+
+    private GroupMetadataManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        List<PartitionAssignor> assignors,
+        TopicsImage topicsImage,
+        int consumerGroupMaxSize,
+        int consumerGroupHeartbeatIntervalMs
+    ) {
+        this.log = logContext.logger(GroupMetadataManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.topicsImage = topicsImage;
+        this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
+        this.defaultAssignor = assignors.get(0);
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.groupsByTopicName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * Gets or maybe creates a consumer group.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     */
+    // Package private for testing.
+    ConsumerGroup getOrMaybeCreateConsumerGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
+        }
+
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
+            groups.put(groupId, consumerGroup);
+            return consumerGroup;
+        } else {
+            if (group.type() == Group.GroupType.CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Removes the group.
+     *
+     * @param groupId The group id.
+     */
+    private void removeGroup(
+        String groupId
+    ) {
+        groups.remove(groupId);
+    }
+
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
+        ConsumerGroupHeartbeatRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        if (request.groupId().isEmpty()) {
+            throw new InvalidRequestException("GroupId can't be empty.");
+        }
+
+        if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+            if (request.memberId().isEmpty()) {
+                throw new InvalidRequestException("MemberId can't be empty.");
+            }
+            if (request.instanceId() != null) {
+                throw new InvalidRequestException("InstanceId should only be provided in first request.");
+            }
+            if (request.rackId() != null) {
+                throw new InvalidRequestException("RackId should only be provided in first request.");
+            }
+        } else if (request.memberEpoch() == 0) {
+            if (request.rebalanceTimeoutMs() == -1) {
+                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
+            }
+            if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
+                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
+            }
+            if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
+                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
+            }
+            if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
+                throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+                    + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+                    + ".");
+            }
+        } else {
+            throw new InvalidRequestException("MemberEpoch is invalid.");
+        }
+
+        if (request.subscribedTopicRegex() != null) {
+            throw new InvalidRequestException("SubscribedTopicRegex is not supported yet.");
+        }
+
+        if (request.clientAssignors() != null) {
+            // TODO We need to remove them from the request.
+            throw new InvalidRequestException("Client side assignors are not supported yet.");
+        }
+    }
+
+    /**
+     * Verifies that the partitions currently owned by the member (the ones set in the
+     * request) matches the ones that the member should own. It matches if the client
+     * has a least of subset of them.
+     *
+     * @param ownedTopicPartitions  The partitions provided by the client in the request.
+     * @param target                The partitions that they client should have.

Review Comment:
   nit: that the client/member



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The list of supported assignors.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The generic and consumer groups keyed by their subscribed topics.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName;
+
+    /**
+     * The maximum number of members allowed in a single consumer group.
+     */
+    private final int consumerGroupMaxSize;
+
+    /**
+     * The heartbeat interval for consumer groups.
+     */
+    private final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The topics metadata (or image).
+     */
+    private TopicsImage topicsImage;

Review Comment:
   will this be updated when we read changes from the metadata log?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RecordHelpers {
+    private RecordHelpers() {}
+
+    public static Record newMemberSubscriptionRecord(
+        String groupId,
+        ConsumerGroupMember member
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMemberMetadataKey()
+                    .setGroupId(groupId)
+                    .setMemberId(member.memberId()),
+                (short) 5
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupMemberMetadataValue()
+                    .setRackId(member.rackId())
+                    .setInstanceId(member.instanceId())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setSubscribedTopicNames(member.subscribedTopicNames())
+                    .setSubscribedTopicRegex(member.subscribedTopicRegex())
+                    .setServerAssignor(member.serverAssignorName().orElse(null))
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setAssignors(member.clientAssignors().stream().map(assignorState ->
+                        new ConsumerGroupMemberMetadataValue.Assignor()
+                            .setName(assignorState.name())
+                            .setReason(assignorState.reason())
+                            .setMinimumVersion(assignorState.minimumVersion())
+                            .setMaximumVersion(assignorState.maximumVersion())
+                            .setVersion(assignorState.metadataVersion())
+                            .setMetadata(assignorState.metadataBytes().array())
+                    ).collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    public static Record newMemberSubscriptionTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMemberMetadataKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                (short) 5
+            ),
+            null // Tombstone.
+        );
+    }
+
+    public static Record newGroupSubscriptionMetadataRecord(
+        String groupId,
+        Map<String, TopicMetadata> newSubscriptionMetadata
+    ) {
+        ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
+        newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
+            value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
+                .setTopicId(topicMetadata.id())
+                .setTopicName(topicMetadata.name())
+                .setNumPartitions(topicMetadata.numPartitions())
+            )
+        );
+
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupPartitionMetadataKey()
+                    .setGroupId(groupId),
+                (short) 4
+            ),
+            new ApiMessageAndVersion(
+                value,
+                (short) 0
+            )
+        );
+    }
+
+    public static Record newGroupSubscriptionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupPartitionMetadataKey()
+                    .setGroupId(groupId),
+                (short) 4
+            ),
+            null // Tombstone.
+        );
+    }
+
+    public static Record newGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataKey()
+                    .setGroupId(groupId),
+                (short) 3
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    public static Record newGroupEpochTombstoneRecord(
+        String groupId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupMetadataKey()
+                    .setGroupId(groupId),
+                (short) 3
+            ),
+            null // Tombstone.
+        );
+    }
+
+    public static Record newTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        Map<Uuid, Set<Integer>> partitions
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMemberKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                (short) 7
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMemberValue()
+                    .setTopicPartitions(partitions.entrySet().stream()
+                        .map(keyValue -> new ConsumerGroupTargetAssignmentMemberValue.TopicPartition()
+                            .setTopicId(keyValue.getKey())
+                            .setPartitions(new ArrayList<>(keyValue.getValue())))
+                        .collect(Collectors.toList())),
+                (short) 0
+            )
+        );
+    }
+
+    public static Record newTargetAssignmentTombstoneRecord(
+        String groupId,
+        String memberId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMemberKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                (short) 7
+            ),
+            null // Tombstone.
+        );
+    }
+
+    public static Record newTargetAssignmentEpochRecord(
+        String groupId,
+        int groupEpoch
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMetadataKey()
+                    .setGroupId(groupId),
+                (short) 6
+            ),
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(groupEpoch),
+                (short) 0
+            )
+        );
+    }
+
+    public static Record newTargetAssignmentEpochTombstoneRecord(
+        String groupId
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupTargetAssignmentMetadataKey()
+                    .setGroupId(groupId),
+                (short) 6
+            ),
+            null // Tombstone.
+        );
+    }
+
+    public static Record newCurrentAssignmentRecord(
+        String groupId,
+        ConsumerGroupMember member
+    ) {
+        return new Record(
+            new ApiMessageAndVersion(
+                new ConsumerGroupCurrentMemberAssignmentKey()

Review Comment:
   Hmm, I prefer to keep the Member keyword because it suggests that the key is unique per member.
   
   i agree `ConsumerGroupCurrentMemberAssignmentKey/Value` is more readable, but it is also confusing that the ordering is different



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+    public enum ConsumerGroupState {
+        EMPTY("empty"),
+        ASSIGNING("assigning"),
+        RECONCILING("reconciling"),
+        STABLE("stable"),
+        DEAD("dead");
+
+        private final String name;
+
+        ConsumerGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<ConsumerGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the subscriptions
+     * are updated and it will trigger the computation of a new assignment
+     * for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, ConsumerGroupMember> members;
+
+    /**
+     * The metadata of the subscribed topics.
+     */
+    private final TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata;
+
+    /**
+     * The assignment epoch. An assignment epoch smaller than the group epoch means
+     * that a new assignment is required. The assignment epoch is updated when a new
+     * assignment is installed.
+     */
+    private final TimelineInteger assignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private final TimelineHashMap<String, ConsumerGroupMemberAssignment> assignments;
+
+    /**
+     * The current partition epoch maps each topic-partitions to their current epoch where
+     * the epoch is the epoch of their owners. When a member revokes a partition, it removes
+     * itself from this map. When a member gets a partition, it adds itself to this map.
+     */
+    private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
+
+    public ConsumerGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId
+    ) {
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * The type of this group.
+     *
+     * @return The group type (Consumer).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.CONSUMER;
+    }
+
+    /**
+     * The state of this group.
+     *
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * The group id.
+     *
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * The state of this group.
+     *
+     * @return The current state.
+     */
+    public ConsumerGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * Returns the current group epoch.
+     *
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Returns the current assignment epoch.
+     *
+     * @return The current assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return assignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param assignmentEpoch The new assignment epoch.
+     */
+    public void setAssignmentEpoch(int assignmentEpoch) {
+        this.assignmentEpoch.set(assignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Gets or creates a member.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member = members.get(memberId);
+        if (member == null) {
+            if (!createIfNotExists) {
+                throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
+                    memberId, groupId));
+            }
+            member = new ConsumerGroupMember.Builder(memberId).build();
+            members.put(memberId, member);
+        }
+
+        return member;
+    }
+
+    /**
+     * Updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(ConsumerGroupMember newMember) {
+        ConsumerGroupMember oldMember = members.put(newMember.memberId(), newMember);
+        maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        ConsumerGroupMember member = members.remove(memberId);
+        maybeRemovePartitionEpoch(member);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     *
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * Returns the number of members in the group.
+     *
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * Returns the members keyed by their id.
+     *
+     * @return A immutable Map containing all the members.
+     */
+    public Map<String, ConsumerGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * Returns the current target assignment of the member.
+     *
+     * @return The ConsumerGroupMemberAssignment or an EMPTY one if it does not
+     *         exist.
+     */
+    public ConsumerGroupMemberAssignment targetAssignment(String memberId) {
+        return assignments.getOrDefault(memberId, ConsumerGroupMemberAssignment.EMPTY);
+    }
+
+    /**
+     * Updates target assignment of a member.
+     *
+     * @param memberId              The member id.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, ConsumerGroupMemberAssignment newTargetAssignment) {
+        assignments.put(memberId, newTargetAssignment);
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        assignments.remove(memberId);
+    }
+
+    /**
+     * Returns the target assignments for the entire group.
+     *
+     * @return A immutable Map containing all the target assignments.
+     */
+    public Map<String, ConsumerGroupMemberAssignment> targetAssignments() {
+        return Collections.unmodifiableMap(assignments);
+    }
+
+    /**
+     * Returns the current epoch of a partition or -1 if the partition
+     * does not have one.
+     *
+     * @param topicId       The topic id.
+     * @param partitionId   The partition id.
+     *
+     * @return The epoch or -1.
+     */
+    public int currentPartitionEpoch(
+        Uuid topicId, int partitionId
+    ) {
+        Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
+        if (partitions == null) {
+            return -1;
+        } else {
+            return partitions.getOrDefault(partitionId, -1);
+        }
+    }
+
+    /**
+     * Compute the preferred (server side) assignor for the group while
+     * using the provided assignor for the member.
+     *
+     * @param updatedMemberId       The member id.
+     * @param serverAssignorNameOpt The assignor name.
+     *
+     * @return An Optional containing the preferred assignor.
+     */
+    public Optional<String> preferredServerAssignor(
+        String updatedMemberId,
+        Optional<String> serverAssignorNameOpt
+    ) {
+        Map<String, Integer> counts = new HashMap<>();
+
+        serverAssignorNameOpt.ifPresent(serverAssignorName ->
+            counts.put(serverAssignorName, 1)
+        );
+
+        members.forEach((memberId, member) -> {
+            if (!memberId.equals(updatedMemberId) && member.serverAssignorName().isPresent()) {
+                counts.compute(member.serverAssignorName().get(), (k, v) -> v == null ? 1 : v + 1);
+            }
+        });
+
+        return counts.entrySet().stream()
+            .max(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey);
+    }
+
+    /**
+     * Returns the subscription metadata for all the topics whose
+     * members are subscribed to.
+     *
+     * @return An immutable Map containing the subscription metadata.
+     */
+    public Map<String, TopicMetadata> subscriptionMetadata() {
+        return Collections.unmodifiableMap(subscribedTopicMetadata);
+    }
+
+    /**
+     * Updates the subscription metadata. This replace the previous one.
+     *
+     * @param subscriptionMetadata The new subscription metadata.
+     */
+    public void setSubscriptionMetadata(
+        Map<String, TopicMetadata> subscriptionMetadata
+    ) {
+        this.subscribedTopicMetadata.clear();
+        this.subscribedTopicMetadata.putAll(subscriptionMetadata);
+    }
+
+    /**
+     * Computes new subscription metadata but with specific information for
+     * a member.
+     *
+     * @param updatedMemberId               The member id.
+     * @param updatedMemberSubscriptions    The member's updated topic subscriptions.
+     * @param topicsImage                   The topics metadata.
+     *
+     * @return The new subscription metadata as an immutable Map.
+     */
+    public Map<String, TopicMetadata> computeSubscriptionMetadata(
+        String updatedMemberId,
+        List<String> updatedMemberSubscriptions,
+        TopicsImage topicsImage
+    ) {
+        Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscriptionMetadata().size());
+
+        Consumer<List<String>> updateSubscription = subscribedTopicNames -> {
+            subscribedTopicNames.forEach(topicName ->
+                newSubscriptionMetadata.computeIfAbsent(topicName, __ -> {
+                    TopicImage topicImage = topicsImage.getTopic(topicName);
+                    if (topicImage == null) {
+                        return null;
+                    } else {
+                        return new TopicMetadata(
+                            topicImage.id(),
+                            topicImage.name(),
+                            topicImage.partitions().size()
+                        );
+                    }
+                })
+            );
+        };
+
+        if (updatedMemberSubscriptions != null) {
+            updateSubscription.accept(updatedMemberSubscriptions);
+        }
+
+        members.forEach((memberId, member) -> {
+            if (!memberId.equals(updatedMemberId)) {
+                updateSubscription.accept(member.subscribedTopicNames());
+            }
+        });
+

Review Comment:
   it's a bit unfortunate that we are iterating through the entire members map every time a single member changes its subscription. i wonder if we can keep a map of TopicMetadata to a set of member ids



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of methods:
+ * 1) The request handlers which handle the requests and generate a response and records to
+ *    mutate the hard state. Those records will be written by the runtime and applied to the
+ *    hard state via the replay methods.
+ * 2) The replay methods which apply records to the hard state. Those are used in the request
+ *    handling as well as during the initial loading of the records from the partitions.
+ */
+public class GroupMetadataManager {
+
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private List<PartitionAssignor> assignors = null;
+        private TopicsImage topicsImage = null;
+        private int consumerGroupMaxSize = Integer.MAX_VALUE;
+        private int consumerGroupHeartbeatIntervalMs = 5000;
+
+        Builder withLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder withAssignors(List<PartitionAssignor> assignors) {
+            this.assignors = assignors;
+            return this;
+        }
+
+        Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
+            this.consumerGroupMaxSize = consumerGroupMaxSize;
+            return this;
+        }
+
+        Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs) {
+            this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+            return this;
+        }
+
+        Builder withTopicsImage(TopicsImage topicsImage) {
+            this.topicsImage = topicsImage;
+            return this;
+        }
+
+        GroupMetadataManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+
+            if (assignors == null || assignors.isEmpty()) {
+                throw new IllegalStateException("Assignors must be set before building.");
+            }
+
+            return new GroupMetadataManager(
+                snapshotRegistry,
+                logContext,
+                assignors,
+                topicsImage,
+                consumerGroupMaxSize,
+                consumerGroupHeartbeatIntervalMs
+            );
+        }
+    }
+
+    /**
+     * The logger.
+     */
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The list of supported assignors.
+     */
+    private final Map<String, PartitionAssignor> assignors;
+
+    /**
+     * The default assignor used.
+     */
+    private final PartitionAssignor defaultAssignor;
+
+    /**
+     * The generic and consumer groups keyed by their name.
+     */
+    private final TimelineHashMap<String, Group> groups;
+
+    /**
+     * The generic and consumer groups keyed by their subscribed topics.
+     */
+    private final TimelineHashMap<String, TimelineHashSet<Group>> groupsByTopicName;
+
+    /**
+     * The maximum number of members allowed in a single consumer group.
+     */
+    private final int consumerGroupMaxSize;
+
+    /**
+     * The heartbeat interval for consumer groups.
+     */
+    private final int consumerGroupHeartbeatIntervalMs;
+
+    /**
+     * The topics metadata (or image).
+     */
+    private TopicsImage topicsImage;
+
+    private GroupMetadataManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        List<PartitionAssignor> assignors,
+        TopicsImage topicsImage,
+        int consumerGroupMaxSize,
+        int consumerGroupHeartbeatIntervalMs
+    ) {
+        this.log = logContext.logger(GroupMetadataManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.topicsImage = topicsImage;
+        this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
+        this.defaultAssignor = assignors.get(0);
+        this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.groupsByTopicName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.consumerGroupMaxSize = consumerGroupMaxSize;
+        this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * Gets or maybe creates a consumer group.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     */
+    // Package private for testing.
+    ConsumerGroup getOrMaybeCreateConsumerGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
+        }
+
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId);
+            groups.put(groupId, consumerGroup);
+            return consumerGroup;
+        } else {
+            if (group.type() == Group.GroupType.CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Removes the group.
+     *
+     * @param groupId The group id.
+     */
+    private void removeGroup(
+        String groupId
+    ) {
+        groups.remove(groupId);
+    }
+
+    /**
+     * Validates the request.
+     *
+     * @param request The request to validate.
+     *
+     * @throws InvalidRequestException if the request is not valid.
+     * @throws UnsupportedAssignorException if the assignor is not supported.
+     */
+    private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
+        ConsumerGroupHeartbeatRequestData request
+    ) throws InvalidRequestException, UnsupportedAssignorException {
+        if (request.groupId().isEmpty()) {
+            throw new InvalidRequestException("GroupId can't be empty.");
+        }
+
+        if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+            if (request.memberId().isEmpty()) {
+                throw new InvalidRequestException("MemberId can't be empty.");
+            }
+            if (request.instanceId() != null) {
+                throw new InvalidRequestException("InstanceId should only be provided in first request.");
+            }
+            if (request.rackId() != null) {
+                throw new InvalidRequestException("RackId should only be provided in first request.");
+            }
+        } else if (request.memberEpoch() == 0) {
+            if (request.rebalanceTimeoutMs() == -1) {
+                throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
+            }
+            if (request.topicPartitions() == null || !request.topicPartitions().isEmpty()) {
+                throw new InvalidRequestException("TopicPartitions must be empty when (re-)joining.");
+            }
+            if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
+                throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
+            }
+            if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
+                throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+                    + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+                    + ".");
+            }
+        } else {
+            throw new InvalidRequestException("MemberEpoch is invalid.");
+        }
+
+        if (request.subscribedTopicRegex() != null) {
+            throw new InvalidRequestException("SubscribedTopicRegex is not supported yet.");
+        }
+
+        if (request.clientAssignors() != null) {
+            // TODO We need to remove them from the request.
+            throw new InvalidRequestException("Client side assignors are not supported yet.");
+        }
+    }
+
+    /**
+     * Verifies that the partitions currently owned by the member (the ones set in the
+     * request) matches the ones that the member should own. It matches if the client
+     * has a least of subset of them.
+     *
+     * @param ownedTopicPartitions  The partitions provided by the client in the request.
+     * @param target                The partitions that they client should have.
+     *
+     * @return A boolean indicating whether the owned partitions are a subset or not.
+     */
+    private boolean isSubset(
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions,
+        Map<Uuid, Set<Integer>> target
+    ) {
+        if (ownedTopicPartitions == null) return false;
+
+        for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
+            Set<Integer> partitions = target.get(topicPartitions.topicId());
+            if (partitions == null) return false;
+            for (Integer partitionId : topicPartitions.partitions()) {
+                if (!partitions.contains(partitionId)) return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Checks whether the consumer group can accept a new member or not based on the
+     * max group side defined.
+     *
+     * @param group     The consumer group.
+     * @param memberId  The member id.
+     *
+     * @throws GroupMaxSizeReachedException if the maximum capacity has been reached.
+     */
+    private void throwIfConsumerGroupIsFull(
+        ConsumerGroup group,
+        String memberId
+    ) throws GroupMaxSizeReachedException {
+        // If the consumer group has reached its maximum capacity, the member is rejected if it is not
+        // already a member of the consumer group.
+        if (group.numMembers() >= consumerGroupMaxSize && (memberId.isEmpty() || !group.hasMember(memberId))) {
+            throw new GroupMaxSizeReachedException("The consumer group has reached its maximum capacity of "
+                + consumerGroupMaxSize + " members.");
+        }
+    }
+
+    /**
+     * Validates the member epoch provided in the heartbeat request.
+     *
+     * @param member                The consumer group member.
+     * @param memberEpoch           The member epoch.
+     * @param ownedTopicPartitions  The owned partitions.
+     *
+     * @throws NotCoordinatorException if the provided epoch is ahead of the epoch known
+     *                                 by this coordinator. This suggests that the member
+     *                                 got a higher epoch from another coordinator.
+     * @throws FencedMemberEpochException if the provided epoch is behind the epoch known
+     *                                    by this coordinator.
+     */
+    private void throwIfMemberEpochIsInvalid(
+        ConsumerGroupMember member,
+        int memberEpoch,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+    ) {
+        if (memberEpoch > member.memberEpoch()) {
+            // The member has likely got a bump from another coordinator and this coordinator
+            // is stale. Return NOT_COORDINATOR to force the member to refresh its coordinator.
+            throw new NotCoordinatorException("The consumer group member has got a larger member "
+                + "epoch (" + memberEpoch + ") than the one known by this group coordinator ("
+                + member.memberEpoch() + ").");
+        } else if (memberEpoch < member.memberEpoch()) {
+            // If the member comes with the previous epoch and has a subset of the current assignment partitions,
+            // we accept it because the response with the bumped epoch may have been lost.
+            if (memberEpoch != member.previousMemberEpoch() || !isSubset(ownedTopicPartitions, member.assigned())) {
+                throw new FencedMemberEpochException("The consumer group member has an old member "
+                    + "epoch. The member must abandon all its partitions and rejoin.");
+            }
+        }
+    }
+
+    private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment(
+        ConsumerGroupMember member
+    ) {
+        ConsumerGroupHeartbeatResponseData.Assignment assignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+            .setAssignedTopicPartitions(fromAssignmentMap(member.assigned()));
+
+        if (member.state() == ConsumerGroupMember.MemberState.ASSIGNING) {
+            assignment.setPendingTopicPartitions(fromAssignmentMap(member.assigning()));
+        }
+
+        return assignment;
+    }
+
+    private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromAssignmentMap(
+        Map<Uuid, Set<Integer>> assignment
+    ) {
+        return assignment.entrySet().stream()
+            .map(keyValue -> new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                .setTopicId(keyValue.getKey())
+                .setPartitions(new ArrayList<>(keyValue.getValue())))
+            .collect(Collectors.toList());
+    }
+
+    private OptionalInt ofSentinel(int value) {
+        return value != -1 ? OptionalInt.of(value) : OptionalInt.empty();
+    }
+
+    /**
+     * Handles a regular heartbeat from a consumer group member.
+     *
+     * @param groupId               The group id from the request.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param instanceId            The instance id from the request or null.
+     * @param rackId                The rack id from the request or null.
+     * @param rebalanceTimeoutMs    The rebalance timeout from the request or -1.
+     * @param clientId              The client id.
+     * @param clientHost            The client host.
+     * @param subscribedTopicNames  The list of subscribed topic names from the request
+     *                              of null.
+     * @param subscribedTopicRegex  The regular expression based subscription from the
+     *                              request or null.
+     * @param assignorName          The assignor name from the request or null.
+     * @param ownedTopicPartitions  The list of owned partitions from the request or null.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    private Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames,
+        String subscribedTopicRegex,
+        String assignorName,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
+    ) throws ApiException {
+        List<Record> records = new ArrayList<>();
+        boolean createIfNotExists = memberEpoch == 0;
+
+        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, createIfNotExists);
+        throwIfConsumerGroupIsFull(group, memberId);
+
+        if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
+
+        if (memberEpoch == 0) {
+            log.info("[GroupId " + groupId + "] Member " + memberId + " re-joins the consumer group.");
+        }
+
+        // Update the subscription part of the member if we received new values. If the member has
+        // changed, we write it to the log. If the subscribed topics have changed, we also recompute
+        // the subscription metadata.
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
+        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
+            .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+            .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
+            .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+
+            if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
+                log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
+                    updatedMember.subscribedTopicNames());
+
+                subscriptionMetadata = group.computeSubscriptionMetadata(
+                    updatedMember.memberId(),
+                    updatedMember.subscribedTopicNames(),
+                    topicsImage
+                );
+
+                if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                    log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                        + subscriptionMetadata + ".");
+                    records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+                }
+
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch));
+
+                log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+            }
+
+            member = updatedMember;
+        }
+
+        // Update target assignment if needed. If the new target has any changes, we write the
+        // changes to the log.
+        int targetAssignmentEpoch = group.assignmentEpoch();
+        ConsumerGroupMemberAssignment targetAssignment = group.targetAssignment(memberId);
+        if (groupEpoch > targetAssignmentEpoch) {
+            String preferredServerAssignor = group.preferredServerAssignor(
+                member.memberId(),
+                member.serverAssignorName()
+            ).orElse(defaultAssignor.name());
+
+            try {
+                TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+                    new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(preferredServerAssignor))
+                        .withMembers(group.members())
+                        .withSubscriptionMetadata(subscriptionMetadata)
+                        .withTargetAssignments(group.targetAssignments())
+                        .withUpdatedMember(member.memberId(), member)
+                        .build();
+
+                log.info("[GroupId " + groupId + "] Computed a new target assignment for epoch " + groupEpoch + ": "
+                    + assignmentResult.assignments() + ".");
+
+                records.addAll(assignmentResult.records());
+                targetAssignment = assignmentResult.assignments().get(member.memberId());
+                targetAssignmentEpoch = groupEpoch;
+            } catch (PartitionAssignorException ex) {
+                String msg = "Failed to compute a new target assignment for epoch " + groupEpoch + ": " + ex + ".";
+                log.error("[GroupId " + groupId + "] " + msg);
+                throw new UnknownServerException(msg, ex);
+            }
+        }
+
+        // If the member is stable and its next epoch matches the current target epoch
+        // of the assignment, we can skip this reconciliation.
+        boolean assignmentUpdated = false;
+        if (member.state() != ConsumerGroupMember.MemberState.STABLE
+            || member.nextMemberEpoch() != targetAssignmentEpoch) {
+            updatedMember = new CurrentAssignmentBuilder(member)
+                .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+                .withCurrentPartitionEpoch(group::currentPartitionEpoch)
+                .withOwnedTopicPartitions(ownedTopicPartitions)
+                .build();
+
+            // Checking the reference is enough here because a new instance
+            // is created only when the state has changed.
+            if (updatedMember != member) {
+                assignmentUpdated = true;
+                records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+                log.info("[GroupId " + groupId + "] Member " + memberId + " transitioned from " +
+                    member.currentAssignmentSummary() + " to " + updatedMember.currentAssignmentSummary() + ".");
+
+                // TODO(dajac) Starts or restarts the timer for the revocation timeout.
+
+                member = updatedMember;
+            }
+        }
+
+        // TODO(dajac) Starts or restarts the timer for the session timeout.
+
+        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
+            .setMemberId(member.memberId())
+            .setMemberEpoch(member.memberEpoch())
+            .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
+
+        if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) {
+            response.setAssignment(createResponseAssignment(member));
+        }
+
+        return new Result<>(records, response);
+    }
+
+    /**
+     * Handles leave request from a consumer group member.
+     * @param groupId       The group id from the request.
+     * @param memberId      The member id from the request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    private Result<ConsumerGroupHeartbeatResponseData> consumerGroupLeave(
+        String groupId,
+        String memberId
+    ) throws ApiException {
+        List<Record> records = new ArrayList<>();
+
+        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
+        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
+
+        log.info("[GroupId " + groupId + "] Member " + memberId + " left the consumer group.");
+
+        // Write tombstones for the member. The order matters here.
+        records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newTargetAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId));
+
+        // We update the subscription metadata without the leaving member.
+        Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
+            memberId,
+            null,
+            topicsImage
+        );
+
+        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+            log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                + subscriptionMetadata + ".");
+            records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+        }
+
+        // We bump the group epoch.
+        int groupEpoch = group.groupEpoch() + 1;
+        records.add(newGroupEpochRecord(groupId, groupEpoch));
+
+        // We update the target assignment for the group and write it to
+        // the log.
+        String assignorName = group.preferredServerAssignor(
+            member.memberId(),
+            Optional.empty()
+        ).orElse(defaultAssignor.name());
+
+        try {
+            TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+                new TargetAssignmentBuilder(groupId, groupEpoch, assignors.get(assignorName))
+                    .withMembers(group.members())
+                    .withSubscriptionMetadata(subscriptionMetadata)
+                    .withRemoveMembers(member.memberId())
+                    .build();
+
+            log.info("[GroupId " + groupId + "] Computed a new target assignment for epoch " + groupEpoch + ": "
+                + assignmentResult.assignments() + ".");
+
+            records.addAll(assignmentResult.records());
+        } catch (PartitionAssignorException ex) {
+            String msg = "Failed to compute a new target assignment for epoch " + groupEpoch + ": " + ex + ".";
+            log.error("[GroupId " + groupId + "] " + msg);
+            throw new UnknownServerException(msg, ex);
+        }
+
+        return new Result<>(records, new ConsumerGroupHeartbeatResponseData()
+            .setMemberId(memberId)
+            .setMemberEpoch(-1)
+        );
+    }
+
+    /**
+     * Handles a ConsumerGroupHeartbeat request.
+     *
+     * @param context The request context.
+     * @param request The actual ConsumerGroupHeartbeat request.
+     *
+     * @return A Result containing the ConsumerGroupHeartbeat response and
+     *         a list of records to update the state machine.
+     */
+    public Result<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
+        RequestContext context,
+        ConsumerGroupHeartbeatRequestData request
+    ) throws ApiException {
+        throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
+
+        if (request.memberEpoch() == -1) {
+            // -1 means that the member wants to leave the group.
+            return consumerGroupLeave(
+                request.groupId(),
+                request.memberId()
+            );
+        } else {
+            // Otherwise, it is a regular heartbeat.
+            return consumerGroupHeartbeat(
+                request.groupId(),
+                request.memberId(),
+                request.memberEpoch(),
+                request.instanceId(),
+                request.rackId(),
+                request.rebalanceTimeoutMs(),
+                context.clientId(),
+                context.clientAddress.toString(),
+                request.subscribedTopicNames(),
+                request.subscribedTopicRegex(),
+                request.serverAssignor(),
+                request.topicPartitions()
+            );
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
+     * the consumer group.
+     *
+     * @param key   A ConsumerGroupMemberMetadataKey key.
+     * @param value A ConsumerGroupMemberMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupMemberMetadataKey key,
+        ConsumerGroupMemberMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+
+        if (value != null) {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
+            ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
+            consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
+                .mergeWith(value)
+                .build());
+        } else {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
+            if (oldMember.memberEpoch() != -1) {
+                throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+                    + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
+            }
+            if (consumerGroup.targetAssignments().containsKey(memberId)) {
+                throw new IllegalStateException("Received a tombstone record to delete member " + memberId
+                    + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            consumerGroup.removeMember(memberId);
+        }
+    }
+
+    /**
+     * Replays ConsumerGroupMetadataKey/Value to update the hard state of
+     * the consumer group.
+     *
+     * @param key   A ConsumerGroupMetadataKey key.
+     * @param value A ConsumerGroupMetadataValue record.
+     */
+    public void replay(
+        ConsumerGroupMetadataKey key,
+        ConsumerGroupMetadataValue value
+    ) {
+        String groupId = key.groupId();
+
+        if (value != null) {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
+            consumerGroup.setGroupEpoch(value.epoch());
+        } else {
+            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
+            if (!consumerGroup.members().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but the group still has " + consumerGroup.members().size() + " members.");
+            }
+            if (!consumerGroup.targetAssignments().isEmpty()) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but the group still has " + consumerGroup.targetAssignments().size() + " members.");
+            }
+            if (consumerGroup.assignmentEpoch() != -1) {
+                throw new IllegalStateException("Received a tombstone record to delete group " + groupId
+                    + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
+            }
+            removeGroup(groupId);
+        }
+
+    }
+
+    /**
+     * Replays ConsumerGroupPartitionMetadataKey/Value to update the hard state of

Review Comment:
   Curious as to why the name includes Partition and not Topics. my understanding is this record holds all subscribed topics per group id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org