You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/25 16:51:01 UTC

[kafka] branch trunk updated: KAFKA-14462; [8/N] Add ConsumerGroupMember (#13538)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a36da12b73 KAFKA-14462; [8/N] Add ConsumerGroupMember (#13538)
9a36da12b73 is described below

commit 9a36da12b7359b7158332c541655716312efb5b3
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Apr 25 18:50:51 2023 +0200

    KAFKA-14462; [8/N] Add ConsumerGroupMember (#13538)
    
    This patch adds ConsumerGroupMember.
    
    Reviewers: Christo Lolov <lo...@amazon.com>, Jeff Kim <je...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 checkstyle/suppressions.xml                        |   6 +
 .../group/consumer/ConsumerGroupMember.java        | 611 +++++++++++++++++++++
 .../group/consumer/ConsumerGroupMemberTest.java    | 307 +++++++++++
 3 files changed, 924 insertions(+)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 214a06deb26..af271ee77bd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -318,6 +318,12 @@
     <suppress checks="AvoidStarImport"
               files="MetadataVersionTest.java"/>
 
+    <!-- group coordinator -->
+    <suppress checks="CyclomaticComplexity"
+              files="(ConsumerGroupMember).java"/>
+    <suppress checks="ParameterNumber"
+              files="(ConsumerGroupMember).java"/>
+
     <!-- storage -->
     <suppress checks="CyclomaticComplexity"
               files="(LogValidator|RemoteLogManagerConfig).java"/>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
new file mode 100644
index 00000000000..823368e65e0
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * ConsumerGroupMember contains all the information related to a member
+ * within a consumer group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class ConsumerGroupMember {
+    /**
+     * A builder allowing to create a new member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private int memberEpoch = 0;
+        private int previousMemberEpoch = -1;
+        private int nextMemberEpoch = 0;
+        private String instanceId = null;
+        private String rackId = null;
+        private int rebalanceTimeoutMs = -1;
+        private String clientId = "";
+        private String clientHost = "";
+        private List<String> subscribedTopicNames = Collections.emptyList();
+        private String subscribedTopicRegex = "";
+        private String serverAssignorName = null;
+        private List<ClientAssignor> clientAssignors = Collections.emptyList();
+        private Map<Uuid, Set<Integer>> assignedPartitions = Collections.emptyMap();
+        private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Collections.emptyMap();
+        private Map<Uuid, Set<Integer>> partitionsPendingAssignment = Collections.emptyMap();
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(ConsumerGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.memberEpoch = member.memberEpoch;
+            this.previousMemberEpoch = member.previousMemberEpoch;
+            this.nextMemberEpoch = member.nextMemberEpoch;
+            this.instanceId = member.instanceId;
+            this.rackId = member.rackId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.subscribedTopicNames = member.subscribedTopicNames;
+            this.subscribedTopicRegex = member.subscribedTopicRegex;
+            this.serverAssignorName = member.serverAssignorName;
+            this.clientAssignors = member.clientAssignors;
+            this.assignedPartitions = member.assignedPartitions;
+            this.partitionsPendingRevocation = member.partitionsPendingRevocation;
+            this.partitionsPendingAssignment = member.partitionsPendingAssignment;
+        }
+
+        public Builder setMemberEpoch(int memberEpoch) {
+            this.memberEpoch = memberEpoch;
+            return this;
+        }
+
+        public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+            this.previousMemberEpoch = previousMemberEpoch;
+            return this;
+        }
+
+        public Builder setNextMemberEpoch(int nextMemberEpoch) {
+            this.nextMemberEpoch = nextMemberEpoch;
+            return this;
+        }
+
+        public Builder setInstanceId(String instanceId) {
+            this.instanceId = instanceId;
+            return this;
+        }
+
+        public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
+            this.instanceId = instanceId.orElse(this.instanceId);
+            return this;
+        }
+
+        public Builder setRackId(String rackId) {
+            this.rackId = rackId;
+            return this;
+        }
+
+        public Builder maybeUpdateRackId(Optional<String> rackId) {
+            this.rackId = rackId.orElse(this.rackId);
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setSubscribedTopicNames(List<String> subscribedTopicNames) {
+            this.subscribedTopicNames = subscribedTopicNames;
+            this.subscribedTopicNames.sort(Comparator.naturalOrder());
+            return this;
+        }
+
+        public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicNames) {
+            this.subscribedTopicNames = subscribedTopicNames.orElse(this.subscribedTopicNames);
+            this.subscribedTopicNames.sort(Comparator.naturalOrder());
+            return this;
+        }
+
+        public Builder setSubscribedTopicRegex(String subscribedTopicRegex) {
+            this.subscribedTopicRegex = subscribedTopicRegex;
+            return this;
+        }
+
+        public Builder maybeUpdateSubscribedTopicRegex(Optional<String> subscribedTopicRegex) {
+            this.subscribedTopicRegex = subscribedTopicRegex.orElse(this.subscribedTopicRegex);
+            return this;
+        }
+
+        public Builder setServerAssignorName(String serverAssignorName) {
+            this.serverAssignorName = serverAssignorName;
+            return this;
+        }
+
+        public Builder maybeUpdateServerAssignorName(Optional<String> serverAssignorName) {
+            this.serverAssignorName = serverAssignorName.orElse(this.serverAssignorName);
+            return this;
+        }
+
+        public Builder setClientAssignors(List<ClientAssignor> clientAssignors) {
+            this.clientAssignors = clientAssignors;
+            return this;
+        }
+
+        public Builder maybeUpdateClientAssignors(Optional<List<ClientAssignor>> clientAssignors) {
+            this.clientAssignors = clientAssignors.orElse(this.clientAssignors);
+            return this;
+        }
+
+        public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> assignedPartitions) {
+            this.assignedPartitions = assignedPartitions;
+            return this;
+        }
+
+        public Builder setPartitionsPendingRevocation(Map<Uuid, Set<Integer>> partitionsPendingRevocation) {
+            this.partitionsPendingRevocation = partitionsPendingRevocation;
+            return this;
+        }
+
+        public Builder setPartitionsPendingAssignment(Map<Uuid, Set<Integer>> partitionsPendingAssignment) {
+            this.partitionsPendingAssignment = partitionsPendingAssignment;
+            return this;
+        }
+
+        public Builder updateWith(ConsumerGroupMemberMetadataValue record) {
+            setInstanceId(record.instanceId());
+            setRackId(record.rackId());
+            setClientId(record.clientId());
+            setClientHost(record.clientHost());
+            setSubscribedTopicNames(record.subscribedTopicNames());
+            setSubscribedTopicRegex(record.subscribedTopicRegex());
+            setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
+            setServerAssignorName(record.serverAssignor());
+            setClientAssignors(record.assignors().stream()
+                .map(ClientAssignor::fromRecord)
+                .collect(Collectors.toList()));
+            return this;
+        }
+
+        public Builder updateWith(ConsumerGroupCurrentMemberAssignmentValue record) {
+            setMemberEpoch(record.memberEpoch());
+            setPreviousMemberEpoch(record.previousMemberEpoch());
+            setNextMemberEpoch(record.targetMemberEpoch());
+            setAssignedPartitions(assignmentFromTopicPartitions(record.assignedPartitions()));
+            setPartitionsPendingRevocation(assignmentFromTopicPartitions(record.partitionsPendingRevocation()));
+            setPartitionsPendingAssignment(assignmentFromTopicPartitions(record.partitionsPendingAssignment()));
+            return this;
+        }
+
+        private Map<Uuid, Set<Integer>> assignmentFromTopicPartitions(
+            List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topicPartitionsList
+        ) {
+            return topicPartitionsList.stream().collect(Collectors.toMap(
+                ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId,
+                topicPartitions -> Collections.unmodifiableSet(new HashSet<>(topicPartitions.partitions()))));
+        }
+
+        public ConsumerGroupMember build() {
+            MemberState state;
+            if (!partitionsPendingRevocation.isEmpty()) {
+                state = MemberState.REVOKING;
+            } else if (!partitionsPendingAssignment.isEmpty()) {
+                state = MemberState.ASSIGNING;
+            } else {
+                state = MemberState.STABLE;
+            }
+
+            return new ConsumerGroupMember(
+                memberId,
+                memberEpoch,
+                previousMemberEpoch,
+                nextMemberEpoch,
+                instanceId,
+                rackId,
+                rebalanceTimeoutMs,
+                clientId,
+                clientHost,
+                subscribedTopicNames,
+                subscribedTopicRegex,
+                serverAssignorName,
+                clientAssignors,
+                state,
+                assignedPartitions,
+                partitionsPendingRevocation,
+                partitionsPendingAssignment
+            );
+        }
+    }
+
+    public enum MemberState {
+        REVOKING("revoking"),
+        ASSIGNING("assigning"),
+        STABLE("stable");
+
+        private final String name;
+
+        MemberState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The current member epoch.
+     */
+    private final int memberEpoch;
+
+    /**
+     * The previous member epoch.
+     */
+    private final int previousMemberEpoch;
+
+    /**
+     * The next member epoch. This corresponds to the target
+     * assignment epoch used to compute the current assigned,
+     * revoking and assigning partitions.
+     */
+    private final int nextMemberEpoch;
+
+    /**
+     * The instance id provided by the member.
+     */
+    private final String instanceId;
+
+    /**
+     * The rack id provided by the member.
+     */
+    private final String rackId;
+
+    /**
+     * The rebalance timeout provided by the member.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The client id reported by the member.
+     */
+    private final String clientId;
+
+    /**
+     * The host reported by the member.
+     */
+    private final String clientHost;
+
+    /**
+     * The list of subscriptions (topic names) configured by the member.
+     */
+    private final List<String> subscribedTopicNames;
+
+    /**
+     * The subscription pattern configured by the member.
+     */
+    private final String subscribedTopicRegex;
+
+    /**
+     * The server side assignor selected by the member.
+     */
+    private final String serverAssignorName;
+
+    /**
+     * The states of the client side assignors of the member.
+     */
+    private final List<ClientAssignor> clientAssignors;
+
+    /**
+     * The member state.
+     */
+    private final MemberState state;
+
+    /**
+     * The partitions assigned to this member.
+     */
+    private final Map<Uuid, Set<Integer>> assignedPartitions;
+
+    /**
+     * The partitions being revoked by this member.
+     */
+    private final Map<Uuid, Set<Integer>> partitionsPendingRevocation;
+
+    /**
+     * The partitions waiting to be assigned to this
+     * member. They will be assigned when they are
+     * released by their previous owners.
+     */
+    private final Map<Uuid, Set<Integer>> partitionsPendingAssignment;
+
+    private ConsumerGroupMember(
+        String memberId,
+        int memberEpoch,
+        int previousMemberEpoch,
+        int nextMemberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames,
+        String subscribedTopicRegex,
+        String serverAssignorName,
+        List<ClientAssignor> clientAssignors,
+        MemberState state,
+        Map<Uuid, Set<Integer>> assignedPartitions,
+        Map<Uuid, Set<Integer>> partitionsPendingRevocation,
+        Map<Uuid, Set<Integer>> partitionsPendingAssignment
+    ) {
+        this.memberId = memberId;
+        this.memberEpoch = memberEpoch;
+        this.previousMemberEpoch = previousMemberEpoch;
+        this.nextMemberEpoch = nextMemberEpoch;
+        this.instanceId = instanceId;
+        this.rackId = rackId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.subscribedTopicNames = subscribedTopicNames;
+        this.subscribedTopicRegex = subscribedTopicRegex;
+        this.serverAssignorName = serverAssignorName;
+        this.clientAssignors = clientAssignors;
+        this.state = state;
+        this.assignedPartitions = assignedPartitions;
+        this.partitionsPendingRevocation = partitionsPendingRevocation;
+        this.partitionsPendingAssignment = partitionsPendingAssignment;
+    }
+
+    /**
+     * @return The member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return The current member epoch.
+     */
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    /**
+     * @return The previous member epoch.
+     */
+    public int previousMemberEpoch() {
+        return previousMemberEpoch;
+    }
+
+    /**
+     * @return The next member epoch.
+     */
+    public int nextMemberEpoch() {
+        return nextMemberEpoch;
+    }
+
+    /**
+     * @return The instance id.
+     */
+    public String instanceId() {
+        return instanceId;
+    }
+
+    /**
+     * @return The rack id.
+     */
+    public String rackId() {
+        return rackId;
+    }
+
+    /**
+     * @return The rebalance timeout in millis.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return The client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return The client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return The list of subscribed topic names.
+     */
+    public List<String> subscribedTopicNames() {
+        return subscribedTopicNames;
+    }
+
+    /**
+     * @return The regular expression based subscription.
+     */
+    public String subscribedTopicRegex() {
+        return subscribedTopicRegex;
+    }
+
+    /**
+     * @return The server side assignor or an empty optional.
+     */
+    public Optional<String> serverAssignorName() {
+        return Optional.ofNullable(serverAssignorName);
+    }
+
+    /**
+     * @return The list of client side assignors.
+     */
+    public List<ClientAssignor> clientAssignors() {
+        return clientAssignors;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public MemberState state() {
+        return state;
+    }
+
+    /**
+     * @return The set of assigned partitions.
+     */
+    public Map<Uuid, Set<Integer>> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    /**
+     * @return The set of partitions awaiting revocation from the member.
+     */
+    public Map<Uuid, Set<Integer>> partitionsPendingRevocation() {
+        return partitionsPendingRevocation;
+    }
+
+    /**
+     * @return The set of partitions awaiting assigning to the member.
+     */
+    public Map<Uuid, Set<Integer>> partitionsPendingAssignment() {
+        return partitionsPendingAssignment;
+    }
+
+    /**
+     * @return A string representation of the current assignment state.
+     */
+    public String currentAssignmentSummary() {
+        return "CurrentAssignment(" +
+            ", memberEpoch=" + memberEpoch +
+            ", previousMemberEpoch=" + previousMemberEpoch +
+            ", nextMemberEpoch=" + nextMemberEpoch +
+            ", state=" + state +
+            ", assignedPartitions=" + assignedPartitions +
+            ", partitionsPendingRevocation=" + partitionsPendingRevocation +
+            ", partitionsPendingAssignment=" + partitionsPendingAssignment +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConsumerGroupMember that = (ConsumerGroupMember) o;
+        return memberEpoch == that.memberEpoch
+            && previousMemberEpoch == that.previousMemberEpoch
+            && nextMemberEpoch == that.nextMemberEpoch
+            && rebalanceTimeoutMs == that.rebalanceTimeoutMs
+            && Objects.equals(memberId, that.memberId)
+            && Objects.equals(instanceId, that.instanceId)
+            && Objects.equals(rackId, that.rackId)
+            && Objects.equals(clientId, that.clientId)
+            && Objects.equals(clientHost, that.clientHost)
+            && Objects.equals(subscribedTopicNames, that.subscribedTopicNames)
+            && Objects.equals(subscribedTopicRegex, that.subscribedTopicRegex)
+            && Objects.equals(serverAssignorName, that.serverAssignorName)
+            && Objects.equals(clientAssignors, that.clientAssignors)
+            && Objects.equals(assignedPartitions, that.assignedPartitions)
+            && Objects.equals(partitionsPendingRevocation, that.partitionsPendingRevocation)
+            && Objects.equals(partitionsPendingAssignment, that.partitionsPendingAssignment);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = memberId != null ? memberId.hashCode() : 0;
+        result = 31 * result + memberEpoch;
+        result = 31 * result + previousMemberEpoch;
+        result = 31 * result + nextMemberEpoch;
+        result = 31 * result + Objects.hashCode(instanceId);
+        result = 31 * result + Objects.hashCode(rackId);
+        result = 31 * result + rebalanceTimeoutMs;
+        result = 31 * result + Objects.hashCode(clientId);
+        result = 31 * result + Objects.hashCode(clientHost);
+        result = 31 * result + Objects.hashCode(subscribedTopicNames);
+        result = 31 * result + Objects.hashCode(subscribedTopicRegex);
+        result = 31 * result + Objects.hashCode(serverAssignorName);
+        result = 31 * result + Objects.hashCode(clientAssignors);
+        result = 31 * result + Objects.hashCode(assignedPartitions);
+        result = 31 * result + Objects.hashCode(partitionsPendingRevocation);
+        result = 31 * result + Objects.hashCode(partitionsPendingAssignment);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumerGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", memberEpoch=" + memberEpoch +
+            ", previousMemberEpoch=" + previousMemberEpoch +
+            ", nextMemberEpoch=" + nextMemberEpoch +
+            ", instanceId='" + instanceId + '\'' +
+            ", rackId='" + rackId + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", subscribedTopicNames=" + subscribedTopicNames +
+            ", subscribedTopicRegex='" + subscribedTopicRegex + '\'' +
+            ", serverAssignorName='" + serverAssignorName + '\'' +
+            ", clientAssignors=" + clientAssignors +
+            ", state=" + state +
+            ", assignedPartitions=" + assignedPartitions +
+            ", partitionsPendingRevocation=" + partitionsPendingRevocation +
+            ", partitionsPendingAssignment=" + partitionsPendingAssignment +
+            ')';
+    }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
new file mode 100644
index 00000000000..7df34650be8
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+    @Test
+    public void testNewMember() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid topicId3 = Uuid.randomUuid();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setNextMemberEpoch(11)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setSubscribedTopicRegex("regex")
+            .setServerAssignorName("range")
+            .setClientAssignors(Collections.singletonList(
+                new ClientAssignor(
+                    "assignor",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 1,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(topicId1, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(topicId2, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(topicId3, 7, 8, 9)))
+            .build();
+
+        assertEquals("member-id", member.memberId());
+        assertEquals(10, member.memberEpoch());
+        assertEquals(9, member.previousMemberEpoch());
+        assertEquals(11, member.nextMemberEpoch());
+        assertEquals("instance-id", member.instanceId());
+        assertEquals("rack-id", member.rackId());
+        assertEquals("client-id", member.clientId());
+        assertEquals("hostname", member.clientHost());
+        // Names are sorted.
+        assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames());
+        assertEquals("regex", member.subscribedTopicRegex());
+        assertEquals("range", member.serverAssignorName().get());
+        assertEquals(
+            Collections.singletonList(
+                new ClientAssignor(
+                    "assignor",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 1,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))),
+            member.clientAssignors());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment());
+    }
+
+    @Test
+    public void testEquals() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid topicId3 = Uuid.randomUuid();
+
+        ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setNextMemberEpoch(11)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setSubscribedTopicRegex("regex")
+            .setServerAssignorName("range")
+            .setClientAssignors(Collections.singletonList(
+                new ClientAssignor(
+                    "assignor",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 1,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(topicId1, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(topicId2, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(topicId3, 7, 8, 9)))
+            .build();
+
+        ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setNextMemberEpoch(11)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setSubscribedTopicRegex("regex")
+            .setServerAssignorName("range")
+            .setClientAssignors(Collections.singletonList(
+                new ClientAssignor(
+                    "assignor",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 1,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(topicId1, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(topicId2, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(topicId3, 7, 8, 9)))
+            .build();
+
+        assertEquals(member1, member2);
+    }
+
+    @Test
+    public void testUpdateMember() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid topicId3 = Uuid.randomUuid();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setNextMemberEpoch(11)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setSubscribedTopicRegex("regex")
+            .setServerAssignorName("range")
+            .setClientAssignors(Collections.singletonList(
+                new ClientAssignor(
+                    "assignor",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 1,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(topicId1, 1, 2, 3)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                mkTopicAssignment(topicId2, 4, 5, 6)))
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(topicId3, 7, 8, 9)))
+            .build();
+
+        // This is a no-op.
+        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.empty())
+            .maybeUpdateInstanceId(Optional.empty())
+            .maybeUpdateServerAssignorName(Optional.empty())
+            .maybeUpdateSubscribedTopicNames(Optional.empty())
+            .maybeUpdateSubscribedTopicRegex(Optional.empty())
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
+            .maybeUpdateClientAssignors(Optional.empty())
+            .build();
+
+        assertEquals(member, updatedMember);
+
+        updatedMember = new ConsumerGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.of("new-rack-id"))
+            .maybeUpdateInstanceId(Optional.of("new-instance-id"))
+            .maybeUpdateServerAssignorName(Optional.of("new-assignor"))
+            .maybeUpdateSubscribedTopicNames(Optional.of(Arrays.asList("zar")))
+            .maybeUpdateSubscribedTopicRegex(Optional.of("new-regex"))
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
+            .maybeUpdateClientAssignors(Optional.of(Collections.emptyList()))
+            .build();
+
+        assertEquals("new-instance-id", updatedMember.instanceId());
+        assertEquals("new-rack-id", updatedMember.rackId());
+        // Names are sorted.
+        assertEquals(Arrays.asList("zar"), updatedMember.subscribedTopicNames());
+        assertEquals("new-regex", updatedMember.subscribedTopicRegex());
+        assertEquals("new-assignor", updatedMember.serverAssignorName().get());
+        assertEquals(Collections.emptyList(), updatedMember.clientAssignors());
+    }
+
+    @Test
+    public void testUpdateWithConsumerGroupMemberMetadataValue() {
+        ConsumerGroupMemberMetadataValue record = new ConsumerGroupMemberMetadataValue()
+            .setAssignors(Collections.singletonList(new ConsumerGroupMemberMetadataValue.Assignor()
+                .setName("client")
+                .setMinimumVersion((short) 0)
+                .setMaximumVersion((short) 2)
+                .setVersion((short) 1)
+                .setMetadata(new byte[0])))
+            .setServerAssignor("range")
+            .setClientId("client-id")
+            .setClientHost("host-id")
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(1000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setSubscribedTopicRegex("regex");
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals("instance-id", member.instanceId());
+        assertEquals("rack-id", member.rackId());
+        assertEquals("client-id", member.clientId());
+        assertEquals("host-id", member.clientHost());
+        // Names are sorted.
+        assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames());
+        assertEquals("regex", member.subscribedTopicRegex());
+        assertEquals("range", member.serverAssignorName().get());
+        assertEquals(
+            Collections.singletonList(
+                new ClientAssignor(
+                    "client",
+                    (byte) 0,
+                    (byte) 0,
+                    (byte) 2,
+                    new VersionedMetadata(
+                        (byte) 1,
+                        ByteBuffer.allocate(0)))),
+            member.clientAssignors());
+    }
+
+    @Test
+    public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+        Uuid topicId3 = Uuid.randomUuid();
+
+        ConsumerGroupCurrentMemberAssignmentValue record = new ConsumerGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(11)
+            .setAssignedPartitions(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId1)
+                .setPartitions(Arrays.asList(0, 1, 2))))
+            .setPartitionsPendingRevocation(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId2)
+                .setPartitions(Arrays.asList(3, 4, 5))))
+            .setPartitionsPendingAssignment(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(topicId3)
+                .setPartitions(Arrays.asList(6, 7, 8))));
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals(10, member.memberEpoch());
+        assertEquals(9, member.previousMemberEpoch());
+        assertEquals(11, member.nextMemberEpoch());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId1, 0, 1, 2)), member.assignedPartitions());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation());
+        assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), member.partitionsPendingAssignment());
+    }
+}