You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/04/26 22:36:38 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata

jeffkbkim opened a new pull request, #13644:
URL: https://github.com/apache/kafka/pull/13644

   Rewrites MemberMetadata as GenericGroupMember that will be used with the new group coordinator.
   
   Written on top of https://github.com/apache/kafka/pull/13639, will rebase once it's merged. Files touched:
   * `GenericGroupMember.java`
   * `GenericGroupMemberTest.java` // TODO
   * `JoinGroupResult.java`
   * `SyncGroupResult.java`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185444113


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    /**
+     * @return the member's assignment.
+     */
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    /**
+     * @return true if the existing heartbeat was satisfied, false otherwise.
+     */
+    public boolean heartBeatSatisfied() {
+        return heartbeatSatisfied;
+    }
+
+    /**
+     * @param value whether the heartbeat was satisfied.
+     */
+    public void setHeartBeatSatisfied(boolean value) {
+        this.heartbeatSatisfied = value;
+    }
+
+    @Override
+    public String toString() {
+        return "GenericGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", groupInstanceId='" + groupInstanceId + '\'' +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", sessionTimeoutMs=" + sessionTimeoutMs +
+            ", protocolType='" + protocolType + '\'' +
+            ", supportedProtocols=" + supportedProtocols +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        GenericGroupMember that = (GenericGroupMember) o;
+
+        return memberId.equals(that.memberId) &&
+            groupInstanceId.equals(that.groupInstanceId) &&
+            clientId.equals(that.clientId) &&
+            clientHost.equals(that.clientHost) &&
+            rebalanceTimeoutMs == that.rebalanceTimeoutMs &&
+            sessionTimeoutMs == that.sessionTimeoutMs &&
+            protocolType.equals(that.protocolType) &&
+            supportedProtocols.equals(that.supportedProtocols) &&
+            Arrays.equals(assignment, that.assignment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(

Review Comment:
   Wow, TIL we can generate this :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185441081


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    /**
+     * @return the member's assignment.
+     */
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    /**
+     * @return true if the existing heartbeat was satisfied, false otherwise.
+     */
+    public boolean heartBeatSatisfied() {
+        return heartbeatSatisfied;
+    }
+
+    /**
+     * @param value whether the heartbeat was satisfied.
+     */
+    public void setHeartBeatSatisfied(boolean value) {
+        this.heartbeatSatisfied = value;
+    }
+
+    @Override
+    public String toString() {
+        return "GenericGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", groupInstanceId='" + groupInstanceId + '\'' +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", sessionTimeoutMs=" + sessionTimeoutMs +
+            ", protocolType='" + protocolType + '\'' +
+            ", supportedProtocols=" + supportedProtocols +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   i added it as i was following the builder pattern. let me remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185332905


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;

Review Comment:
   i removed the final keyword and added a setter method in https://github.com/apache/kafka/pull/13663/files#diff-7ec3bedcac3e1a182a8aed96e4e40f0b5ac3295c90bbc862799b7b57e07f983dR494-R499
   
   as it's only used by the group metadata. should i refactor it here or keep it as is and change it in https://github.com/apache/kafka/pull/13663?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]

Review Comment:
   thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {

Review Comment:
   The reason that i kept the class is because we use this to encapsulate information for both
   `ListGroups` and `DescribeGroup` apis. i.e. this is used in https://github.com/apache/kafka/pull/13663/files#diff-59d730ac42eb65d0a800f16cd34dc14fab8efbb8d70f7c220d1d3ecb82b412a6R663
   
   The alternative would be to to just use the `GenericGroupMember` and `GenericGroup` objects instead. If you think this is better, I can create a jira to refactor this once we merge both to trunk. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1187462145


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;

Review Comment:
   It is better to refactor here. It is confusing otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13644:
URL: https://github.com/apache/kafka/pull/13644#issuecomment-1540246709

   @dajac the test failure is unrelated. can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1183141858


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {

Review Comment:
   The reason that i kept the class is because we use this to encapsulate information for both
   `ListGroups` and `DescribeGroup` apis (similar reasoning for retaining `GroupSummary` in https://github.com/apache/kafka/pull/13663/). i.e. this is used in https://github.com/apache/kafka/pull/13663/files#diff-59d730ac42eb65d0a800f16cd34dc14fab8efbb8d70f7c220d1d3ecb82b412a6R663
   
   The alternative would be to to just use the `GenericGroupMember` and `GenericGroup` objects instead. If you think this is better, I can create a jira to refactor this once we merge both to trunk. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13644:
URL: https://github.com/apache/kafka/pull/13644


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185004444


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;

Review Comment:
   nit: Should we call this one `awaitingJoinCallbackFuture`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    /**
+     * @return the member's assignment.
+     */
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    /**
+     * @return true if the existing heartbeat was satisfied, false otherwise.
+     */
+    public boolean heartBeatSatisfied() {
+        return heartbeatSatisfied;
+    }
+
+    /**
+     * @param value whether the heartbeat was satisfied.
+     */
+    public void setHeartBeatSatisfied(boolean value) {
+        this.heartbeatSatisfied = value;
+    }
+
+    @Override
+    public String toString() {
+        return "GenericGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", groupInstanceId='" + groupInstanceId + '\'' +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", sessionTimeoutMs=" + sessionTimeoutMs +
+            ", protocolType='" + protocolType + '\'' +
+            ", supportedProtocols=" + supportedProtocols +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Do we actually need `equals` and `hashCode`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.

Review Comment:
   nit: `callback` -> `future`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java:
##########
@@ -0,0 +1,252 @@
+package org.apache.kafka.coordinator.group.generic;

Review Comment:
   nit: header is missing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.
+     */
+    private final String name;
+
+    /**
+     * the protocol's metadata.
+     */
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = Objects.requireNonNull(name);
+        this.metadata = metadata;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public byte[] metadata() {
+        return this.metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Protocol that = (Protocol) o;
+        return name.equals(that.name) &&
+            Arrays.equals(this.metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            name,
+            Arrays.hashCode(metadata)
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "Protocol(name= + " + name +

Review Comment:
   nit: ` + ` must be removed. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;

Review Comment:
   In `MemberMetadata`, this field and the following ones are all mutable. What's your plan for this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.

Review Comment:
   nit: Could we put a first line which describe the method? We never starts with `@param`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();

Review Comment:
   nit: The following format would be more conventional:
   
   ```
   Optional<Protocol> match = supportedProtocols.stream()
       .filter(protocol -> protocol.name().equals(protocolName))
       .findFirst();
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]

Review Comment:
   nit: Should we define a constant for this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();

Review Comment:
   nit: Same comment about the format of the code here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */

Review Comment:
   nit: `@param`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.

Review Comment:
   nit: `The.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.

Review Comment:
   ditto about the javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            rebalanceTimeoutMs,
+            sessionTimeoutMs,
+            protocolType,
+            supportedProtocols,
+            new byte[0]
+        );
+    }
+
+    public GenericGroupMember(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String clientHost,
+        int rebalanceTimeoutMs,
+        int sessionTimeoutMs,
+        String protocolType,
+        List<Protocol> supportedProtocols,
+        byte[] assignment
+    ) {
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    /**
+     * @return the member's assignment.
+     */
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    /**
+     * @return true if the existing heartbeat was satisfied, false otherwise.
+     */
+    public boolean heartBeatSatisfied() {
+        return heartbeatSatisfied;
+    }
+
+    /**
+     * @param value whether the heartbeat was satisfied.
+     */
+    public void setHeartBeatSatisfied(boolean value) {
+        this.heartbeatSatisfied = value;
+    }
+
+    @Override
+    public String toString() {
+        return "GenericGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", groupInstanceId='" + groupInstanceId + '\'' +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", sessionTimeoutMs=" + sessionTimeoutMs +
+            ", protocolType='" + protocolType + '\'' +
+            ", supportedProtocols=" + supportedProtocols +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        GenericGroupMember that = (GenericGroupMember) o;
+
+        return memberId.equals(that.memberId) &&
+            groupInstanceId.equals(that.groupInstanceId) &&
+            clientId.equals(that.clientId) &&
+            clientHost.equals(that.clientHost) &&
+            rebalanceTimeoutMs == that.rebalanceTimeoutMs &&
+            sessionTimeoutMs == that.sessionTimeoutMs &&
+            protocolType.equals(that.protocolType) &&
+            supportedProtocols.equals(that.supportedProtocols) &&
+            Arrays.equals(assignment, that.assignment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(

Review Comment:
   I think that we usually prefer to old style because it does not require to convert primitive types objects. e.g.
   
   ```
           final int prime = 31;
           int result = prime + topicId.hashCode();
           result = prime * result + topicPartition.hashCode();
           return result;
   ```
   
   You can auto generate this with the IDE.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.
+     */
+    private final String name;
+
+    /**
+     * the protocol's metadata.
+     */
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = Objects.requireNonNull(name);
+        this.metadata = metadata;
+    }
+
+    public String name() {

Review Comment:
   nit: javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.
+     */
+    private final String name;
+
+    /**
+     * the protocol's metadata.

Review Comment:
   nit: `The.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.
+     */
+    private final String name;
+
+    /**
+     * the protocol's metadata.
+     */
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = Objects.requireNonNull(name);
+        this.metadata = metadata;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public byte[] metadata() {
+        return this.metadata;

Review Comment:
   nit: javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * This class represents a protocol that is supported by a
+ * {@link GenericGroupMember}.
+ */
+public class Protocol {
+
+    /**
+     * the name of the protocol.
+     */
+    private final String name;
+
+    /**
+     * the protocol's metadata.
+     */
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = Objects.requireNonNull(name);
+        this.metadata = metadata;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public byte[] metadata() {
+        return this.metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Protocol that = (Protocol) o;
+        return name.equals(that.name) &&
+            Arrays.equals(this.metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(

Review Comment:
   nit: ditto for the hashcode implementation.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {

Review Comment:
   In the GroupCoordinatorAdaptor, `MemberSummary` is converted to `ListedGroup` or `DescribedGroup`. I think that we could skip the `GroupSummary` and just build those from `GenericGroupMember` and `GenericGroup` as you suggested. `MemberSummary` does not seem necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1182474216


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {

Review Comment:
   Could we also use a auto-generate class to replace this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {

Review Comment:
   I wonder if a Builder is the right pattern given that most of the fields are mutable.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {
+
+        this(memberId,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {
+
+        this(memberId,
+             groupInstanceId,
+             clientId,
+             clientHost,
+             rebalanceTimeoutMs,
+             sessionTimeoutMs,
+             protocolType,
+             supportedProtocols,
+             new byte[0]);
+    }
+
+    public GenericGroupMember(String memberId,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {
+
+        this(memberId,
+             groupInstanceId,
+             clientId,
+             clientHost,
+             rebalanceTimeoutMs,
+             sessionTimeoutMs,
+             protocolType,
+             supportedProtocols,
+             new byte[0]);
+    }
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols,
+                              byte[] assignment) {
+
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    public boolean heartBeatSatisfied() {

Review Comment:
   nit: javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Protocol {
+    private final String name;
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = name;

Review Comment:
   nit: `Objects.requireNonNull`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Protocol {
+    private final String name;
+    private final byte[] metadata;
+
+    public Protocol(String name, byte[] metadata) {
+        this.name = name;
+        this.metadata = metadata;
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public byte[] metadata() {
+        return this.metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Protocol that = (Protocol) o;
+        return name.equals(that.name) &&
+            Arrays.equals(this.metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            name,
+            Arrays.hashCode(metadata)
+        );
+    }
+
+    @Override
+    public String toString() {
+        return name;

Review Comment:
   nit: Should we add the metadata as well?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {
+
+        this(memberId,
+             groupInstanceId,
+             clientId,
+             clientHost,
+             rebalanceTimeoutMs,
+             sessionTimeoutMs,
+             protocolType,
+             supportedProtocols,
+             new byte[0]);
+    }
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols,
+                              byte[] assignment) {
+
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    public byte[] assignment() {

Review Comment:
   nit: javadoc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {

Review Comment:
   nit: Let use the following format in order to be consistent in the new module:
   
   ```
   public GenericGroupMember(
      String memberId,
      ...
   ) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.kafka.coordinator.group.generic;

Review Comment:
   nit: Header is missing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/Protocol.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Protocol {

Review Comment:
   nit: Let's add javadoc as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {
+        private final String memberId;
+        private final Optional<String> groupInstanceId;
+        private final String clientId;
+        private final String clientHost;
+        private final byte[] metadata;
+        private final byte[] assignment;
+        
+        public MemberSummary(String memberId,
+                             Optional<String> groupInstanceId,
+                             String clientId,
+                             String clientHost,
+                             byte[] metadata,
+                             byte[] assignment) {
+            
+            this.memberId = memberId;
+            this.groupInstanceId = groupInstanceId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.metadata = metadata;
+            this.assignment = assignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public Optional<String> getGroupInstanceId() {
+            return groupInstanceId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public byte[] metadata() {
+            return metadata;
+        }
+
+        public byte[] assignment() {
+            return assignment;
+        }
+        
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The group instance id.
+     */
+    private final Optional<String> groupInstanceId;
+
+    /**
+     * The client id.
+     */
+    private final String clientId;
+
+    /**
+     * The client host.
+     */
+    private final String clientHost;
+
+    /**
+     * The rebalance timeout in milliseconds.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The session timeout in milliseconds.
+     */
+    private final int sessionTimeoutMs;
+
+    /**
+     * The protocol type.
+     */
+    private final String protocolType;
+
+    /**
+     * The list of supported protocols.
+     */
+    private final List<Protocol> supportedProtocols;
+
+    /**
+     * The assignment stored by the client assignor.
+     */
+    private final byte[] assignment;
+
+    /**
+     * The callback that is invoked once this member joins the group.
+     */
+    private CompletableFuture<JoinGroupResponseData> awaitingJoinCallback = null;
+
+    /**
+     * The callback that is invoked once this member completes the sync group phase.
+     */
+    private CompletableFuture<SyncGroupResponseData> awaitingSyncCallback = null;
+
+    /**
+     * Indicates whether the member is a new member of the group.
+     */
+    private boolean isNew = false;
+
+    /**
+     * This variable is used to track heartbeat completion through the delayed
+     * heartbeat purgatory. When scheduling a new heartbeat expiration, we set
+     * this value to `false`. Upon receiving the heartbeat (or any other event
+     * indicating the liveness of the client), we set it to `true` so that the
+     * delayed heartbeat can be completed.
+     */
+    private boolean heartbeatSatisfied = false;
+
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols) {
+
+        this(memberId,
+             groupInstanceId,
+             clientId,
+             clientHost,
+             rebalanceTimeoutMs,
+             sessionTimeoutMs,
+             protocolType,
+             supportedProtocols,
+             new byte[0]);
+    }
+
+    public GenericGroupMember(String memberId,
+                              Optional<String> groupInstanceId,
+                              String clientId,
+                              String clientHost,
+                              int rebalanceTimeoutMs,
+                              int sessionTimeoutMs,
+                              String protocolType,
+                              List<Protocol> supportedProtocols,
+                              byte[] assignment) {
+
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.protocolType = protocolType;
+        this.supportedProtocols = supportedProtocols;
+        this.assignment = assignment;
+    }
+
+    /**
+     * @return true if the member is utilizing static membership, false otherwise.
+     */
+    public boolean isStaticMember() {
+        return groupInstanceId.isPresent();
+    }
+
+    /**
+     * @return whether the member is awaiting join.
+     */
+    public boolean isAwaitingJoin() {
+        return awaitingJoinCallback != null;
+    }
+
+    /**
+     * @return whether the member is awaiting sync.
+     */
+    public boolean isAwaitingSync() {
+        return awaitingSyncCallback != null;
+    }
+
+    /**
+     * Get the metadata corresponding to the provided protocol.
+     */
+    public byte[] metadata(String protocolName) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            protocol.name().equals(protocolName))
+                .findFirst();
+
+        if (match.isPresent()) {
+            return match.get().metadata();
+        } else {
+            throw new IllegalArgumentException("Member does not support protocol " +
+                protocolName);
+        }
+    }
+
+    /**
+     * The heartbeat is always considered satisfied when an existing member has made a
+     * successful join/sync group request during a rebalance.
+     *
+     * @return true if heartbeat was satisfied; false otherwise.
+     */
+    public boolean hasSatisfiedHeartbeat() {
+        if (isNew) {
+            // New members can be expired while awaiting join, so we have to check this first
+            return heartbeatSatisfied;
+        } else if (isAwaitingJoin() || isAwaitingSync()) {
+            // Members that are awaiting a rebalance automatically satisfy expected heartbeats
+            return true;
+        } else {
+            // Otherwise, we require the next heartbeat
+            return heartbeatSatisfied;
+        }
+    }
+
+    /**
+     * @param protocols list of protocols to match.
+     * @return true if the given list matches the member's list of supported protocols,
+     *         false otherwise.
+     */
+    public boolean matches(List<Protocol> protocols) {
+        return protocols.equals(this.supportedProtocols);
+    }
+
+    /**
+     * @param protocolName the protocol name.
+     * @return MemberSummary object with metadata corresponding to the protocol name.
+     */
+    public MemberSummary summary(String protocolName) {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            metadata(protocolName),
+            assignment
+        );
+    }
+
+    /**
+     * @return MemberSummary object with no metadata.
+     */
+    public MemberSummary summaryNoMetadata() {
+        return new MemberSummary(
+            memberId,
+            groupInstanceId,
+            clientId,
+            clientHost,
+            new byte[0],
+            new byte[0]
+        );
+    }
+
+    /**
+     * Vote for one of the potential group protocols. This takes into account the protocol preference as
+     * indicated by the order of supported protocols and returns the first one also contained in the set
+     */
+    public String vote(Set<String> candidates) {
+        Optional<Protocol> match = supportedProtocols.stream().filter(protocol ->
+            candidates.contains(protocol.name())).findFirst();
+        
+        if (match.isPresent()) {
+            return match.get().name();
+        } else {
+            throw new IllegalArgumentException("Member does not support any of the candidate protocols");
+        }
+    }
+
+    /**
+     * @param supportedProtocols list of supported protocols.
+     * @return a set of protocol names from the given list of supported protocols.
+     */
+    public static Set<String> plainProtocolSet(List<Protocol> supportedProtocols) {
+        return supportedProtocols.stream().map(Protocol::name).collect(Collectors.toSet());
+    }
+
+    /**
+     * @return the member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return the group instance id.
+     */
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+    /**
+     * @return the client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return the client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return the rebalance timeout in milliseconds.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return the session timeout in milliseconds.
+     */
+    public int sessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    /**
+     * @return the protocol type.
+     */
+    public String protocolType() {
+        return protocolType;
+    }
+
+    /**
+     * @return the list of supported protocols.
+     */
+    public List<Protocol> supportedProtocols() {
+        return supportedProtocols;
+    }
+
+    public byte[] assignment() {
+        return assignment;
+    }
+
+    /**
+     * @return the awaiting join callback.
+     */
+    public CompletableFuture<JoinGroupResponseData> awaitingJoinCallback() {
+        return awaitingJoinCallback;
+    }
+
+    /**
+     * @param value the updated join callback.
+     */
+    public void setAwaitingJoinCallback(CompletableFuture<JoinGroupResponseData> value) {
+        this.awaitingJoinCallback = value;
+    }
+
+    /**
+     * @return the awaiting sync callback.
+     */
+    public CompletableFuture<SyncGroupResponseData> awaitingSyncCallback() {
+        return awaitingSyncCallback;
+    }
+
+    /**
+     * @param value the updated sync callback.
+     */
+    public void setAwaitingSyncCallback(CompletableFuture<SyncGroupResponseData> value) {
+        this.awaitingSyncCallback = value;
+    }
+
+    /**
+     * @return true if the member is new, false otherwise.
+     */
+    public boolean isNew() {
+        return isNew;
+    }
+
+    /**
+     * @param value true if the member is new, false otherwise.
+     */
+    public void setIsNew(boolean value) {
+        this.isNew = value;
+    }
+
+    public boolean heartBeatSatisfied() {
+        return heartbeatSatisfied;
+    }
+
+    public void setHeartBeatSatisfied(boolean value) {

Review Comment:
   nit: javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185446129


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+    /**
+     * A builder allowing to create a new generic member or update an
+     * existing one.
+     *
+     * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private Optional<String> groupInstanceId = Optional.empty();
+        private String clientId = "";
+        private String clientHost = "";
+        private int rebalanceTimeoutMs = -1;
+        private int sessionTimeoutMs = -1;
+        private String protocolType = "";
+        private List<Protocol> supportedProtocols = Collections.emptyList();
+        private byte[] assignment = new byte[0];
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(GenericGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.groupInstanceId = member.groupInstanceId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.sessionTimeoutMs = member.sessionTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.protocolType = member.protocolType;
+            this.supportedProtocols = member.supportedProtocols;
+            this.assignment = member.assignment;
+        }
+
+        public Builder setGroupInstanceId(Optional<String> groupInstanceId) {
+            this.groupInstanceId = groupInstanceId;
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+            this.sessionTimeoutMs = sessionTimeoutMs;
+            return this;
+        }
+
+        public Builder setProtocolType(String protocolType) {
+            this.protocolType = protocolType;
+            return this;
+        }
+
+        public Builder setSupportedProtocols(List<Protocol> protocols) {
+            this.supportedProtocols = protocols;
+            return this;
+        }
+
+        public Builder setAssignment(byte[] assignment) {
+            this.assignment = assignment;
+            return this;
+        }
+
+        public GenericGroupMember build() {
+            return new GenericGroupMember(
+                memberId,
+                groupInstanceId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                protocolType,
+                supportedProtocols,
+                assignment
+            );
+        }
+    }
+
+    private static class MemberSummary {

Review Comment:
   got it. i'll remove it in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13644:
URL: https://github.com/apache/kafka/pull/13644#issuecomment-1532302254

   > @jeffkbkim I left a few comments. It seems that the PR does not depend on #13443. You may want to rebase it on trunk and we could merge it when ready.
   
   I don't believe this was built on top of #13443, were you referring to #13637? Anyways, I have rebased to trunk as #13637 was merged. 
   
   @dajac this is ready for another round of review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1180180911


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##########
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).

Review Comment:
   We should not put javadoc like this. Let's describe what it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org