You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/18 20:26:23 UTC

[kafka] branch trunk updated: KAFKA-7858: Automatically generate JoinGroup request/response

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8406f36  KAFKA-7858: Automatically generate JoinGroup request/response
8406f36 is described below

commit 8406f3624d8f99b614eb7171b71fae8b0e663dcb
Author: Boyang Chen <bc...@outlook.com>
AuthorDate: Mon Mar 18 13:26:09 2019 -0700

    KAFKA-7858: Automatically generate JoinGroup request/response
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../consumer/internals/AbstractCoordinator.java    |  34 +--
 .../consumer/internals/ConsumerCoordinator.java    |  23 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   6 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/JoinGroupRequest.java    | 252 ++++-----------------
 .../kafka/common/requests/JoinGroupResponse.java   | 197 ++--------------
 .../common/message/JoinGroupResponse.json          |   2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  37 ++-
 .../internals/AbstractCoordinatorTest.java         |  32 ++-
 .../internals/ConsumerCoordinatorTest.java         |  79 +++----
 .../kafka/common/requests/RequestResponseTest.java |  56 ++++-
 .../runtime/distributed/WorkerCoordinator.java     |  18 +-
 .../runtime/distributed/WorkerCoordinatorTest.java |  85 +++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  63 ++++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  20 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  20 +-
 16 files changed, 393 insertions(+), 533 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fd7431b..4ff4e19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -44,7 +46,6 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -86,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference;
  *
  * To leverage this protocol, an implementation must define the format of metadata provided by each
  * member for group registration in {@link #metadata()} and the format of the state assignment provided
- * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
+ * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in
  * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
  *
  * Note on locking: this class shares state between the caller and a background thread which is
@@ -183,7 +184,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * on the preference).
      * @return Non-empty map of supported protocols and metadata
      */
-    protected abstract List<ProtocolMetadata> metadata();
+    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolSet metadata();
 
     /**
      * Invoked prior to each group join or rejoin. This is typically used to perform any
@@ -202,7 +203,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     protected abstract Map<String, ByteBuffer> performAssignment(String leaderId,
                                                                  String protocol,
-                                                                 Map<String, ByteBuffer> allMemberMetadata);
+                                                                 List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
 
     /**
      * Invoked when a group member has successfully joined a group. If this call fails with an exception,
@@ -476,7 +477,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
     /**
      * Join the group and return the assignment for the next generation. This function handles both
-     * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
+     * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, List)} if
      * elected leader by the coordinator.
      *
      * NOTE: This is visible only for testing
@@ -490,11 +491,14 @@ public abstract class AbstractCoordinator implements Closeable {
         // send a join group request to the coordinator
         log.info("(Re-)joining group");
         JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
-                groupId,
-                this.sessionTimeoutMs,
-                this.generation.memberId,
-                protocolType(),
-                metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
+                new JoinGroupRequestData()
+                        .setGroupId(groupId)
+                        .setSessionTimeoutMs(this.sessionTimeoutMs)
+                        .setMemberId(this.generation.memberId)
+                        .setProtocolType(protocolType())
+                        .setProtocols(metadata())
+                        .setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
+        );
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
 
@@ -520,8 +524,8 @@ public abstract class AbstractCoordinator implements Closeable {
                         // the group. In this case, we do not want to continue with the sync group.
                         future.raise(new UnjoinedGroupException());
                     } else {
-                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
-                                joinResponse.memberId(), joinResponse.groupProtocol());
+                        AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
+                                joinResponse.data().memberId(), joinResponse.data().protocolName());
                         if (joinResponse.isLeader()) {
                             onJoinLeader(joinResponse).chain(future);
                         } else {
@@ -562,7 +566,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 // and send another join group request in next cycle.
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                        joinResponse.memberId(), null);
+                        joinResponse.data().memberId(), null);
                     AbstractCoordinator.this.rejoinNeeded = true;
                     AbstractCoordinator.this.state = MemberState.UNJOINED;
                 }
@@ -587,8 +591,8 @@ public abstract class AbstractCoordinator implements Closeable {
     private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
-                    joinResponse.members());
+            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
+                    joinResponse.data().members());
 
             SyncGroupRequest.Builder requestBuilder =
                     new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7990707..2b949a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -40,7 +42,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
@@ -166,16 +167,20 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    protected List<ProtocolMetadata> metadata() {
+    protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
         log.debug("Joining group with current subscription: {}", subscriptions.subscription());
         this.joinedSubscription = subscriptions.subscription();
-        List<ProtocolMetadata> metadataList = new ArrayList<>();
+        JoinGroupRequestData.JoinGroupRequestProtocolSet protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet();
+
         for (PartitionAssignor assignor : assignors) {
             Subscription subscription = assignor.subscription(joinedSubscription);
             ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
-            metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
+
+            protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+                    .setName(assignor.name())
+                    .setMetadata(metadata.array()));
         }
-        return metadataList;
+        return protocolSet;
     }
 
     public void updatePatternSubscription(Cluster cluster) {
@@ -385,16 +390,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     @Override
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
-                                                        Map<String, ByteBuffer> allSubscriptions) {
+                                                        List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
         PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
-        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
-            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
-            subscriptions.put(subscriptionEntry.getKey(), subscription);
+        for (JoinGroupResponseData.JoinGroupResponseMember memberSubScription : allSubscriptions) {
+            Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubScription.metadata()));
+            subscriptions.put(memberSubScription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index c23aa7e..3f8d80d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.MetadataRequestData;
@@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.requests.LeaderAndIsrResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -135,7 +135,7 @@ public enum ApiKeys {
     OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
     FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
             FindCoordinatorResponse.schemaVersions()),
-    JOIN_GROUP(11, "JoinGroup", JoinGroupRequest.schemaVersions(), JoinGroupResponse.schemaVersions()),
+    JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
     HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
     LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
     SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1d3fd77..eadd302 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -85,7 +85,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case FIND_COORDINATOR:
                 return new FindCoordinatorResponse(struct);
             case JOIN_GROUP:
-                return new JoinGroupResponse(struct);
+                return new JoinGroupResponse(struct, version);
             case HEARTBEAT:
                 return new HeartbeatResponse(struct);
             case LEAVE_GROUP:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 236d684..2f46172 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -16,188 +16,56 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class JoinGroupRequest extends AbstractRequest {
-    private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
-    private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout";
-    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
-    private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols";
-    private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name";
-    private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata";
-
-    /* Join group api */
-    private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(
-            new Field(PROTOCOL_NAME_KEY_NAME, STRING),
-            new Field(PROTOCOL_METADATA_KEY_NAME, BYTES));
-
-    private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(
-            GROUP_ID,
-            new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives " +
-                    "no heartbeat after this timeout in ms."),
-            MEMBER_ID,
-            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
-            new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
-                    "that the member supports"));
-
-    private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(
-            GROUP_ID,
-            new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives no " +
-                    "heartbeat after this timeout in ms."),
-            new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time that the coordinator will wait for each " +
-                    "member to rejoin when rebalancing the group"),
-            MEMBER_ID,
-            new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"),
-            new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " +
-                    "that the member supports"));
-
-    /* v2 request is the same as v1. Throttle time has been added to response */
-    private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema JOIN_GROUP_REQUEST_V3 = JOIN_GROUP_REQUEST_V2;
-
-    /**
-     * The version number is bumped to indicate that client needs to issue a second join group request under first try
-     * with UNKNOWN_MEMBER_ID.
-     */
-    private static final Schema JOIN_GROUP_REQUEST_V4 = JOIN_GROUP_REQUEST_V3;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2,
-            JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
-    }
-
-    public static final String UNKNOWN_MEMBER_ID = "";
-
-    private final String groupId;
-    private final int sessionTimeout;
-    private final int rebalanceTimeout;
-    private final String memberId;
-    private final String protocolType;
-    private final List<ProtocolMetadata> groupProtocols;
-
-    public static class ProtocolMetadata {
-        private final String name;
-        private final ByteBuffer metadata;
-
-        public ProtocolMetadata(String name, ByteBuffer metadata) {
-            this.name = name;
-            this.metadata = metadata;
-        }
-
-        public String name() {
-            return name;
-        }
-
-        public ByteBuffer metadata() {
-            return metadata;
-        }
-    }
 
     public static class Builder extends AbstractRequest.Builder<JoinGroupRequest> {
-        private final String groupId;
-        private final int sessionTimeout;
-        private final String memberId;
-        private final String protocolType;
-        private final List<ProtocolMetadata> groupProtocols;
-        private int rebalanceTimeout = 0;
 
-        public Builder(String groupId, int sessionTimeout, String memberId,
-                       String protocolType, List<ProtocolMetadata> groupProtocols) {
-            super(ApiKeys.JOIN_GROUP);
-            this.groupId = groupId;
-            this.sessionTimeout = sessionTimeout;
-            this.rebalanceTimeout = sessionTimeout;
-            this.memberId = memberId;
-            this.protocolType = protocolType;
-            this.groupProtocols = groupProtocols;
-        }
+        private final JoinGroupRequestData data;
 
-        public Builder setRebalanceTimeout(int rebalanceTimeout) {
-            this.rebalanceTimeout = rebalanceTimeout;
-            return this;
+        public Builder(JoinGroupRequestData data) {
+            super(ApiKeys.JOIN_GROUP);
+            this.data = data;
         }
 
         @Override
         public JoinGroupRequest build(short version) {
-            if (version < 1) {
-                // v0 had no rebalance timeout but used session timeout implicitly
-                rebalanceTimeout = sessionTimeout;
-            }
-            return new JoinGroupRequest(version, groupId, sessionTimeout,
-                    rebalanceTimeout, memberId, protocolType, groupProtocols);
+            return new JoinGroupRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type: JoinGroupRequest").
-                append(", groupId=").append(groupId).
-                append(", sessionTimeout=").append(sessionTimeout).
-                append(", rebalanceTimeout=").append(rebalanceTimeout).
-                append(", memberId=").append(memberId).
-                append(", protocolType=").append(protocolType).
-                append(", groupProtocols=").append(Utils.join(groupProtocols, ", ")).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private JoinGroupRequest(short version, String groupId, int sessionTimeout,
-            int rebalanceTimeout, String memberId, String protocolType,
-            List<ProtocolMetadata> groupProtocols) {
-        super(ApiKeys.JOIN_GROUP, version);
-        this.groupId = groupId;
-        this.sessionTimeout = sessionTimeout;
-        this.rebalanceTimeout = rebalanceTimeout;
-        this.memberId = memberId;
-        this.protocolType = protocolType;
-        this.groupProtocols = groupProtocols;
-    }
-
-    public JoinGroupRequest(Struct struct, short versionId) {
-        super(ApiKeys.JOIN_GROUP, versionId);
+    private final JoinGroupRequestData data;
+    private final short version;
 
-        groupId = struct.get(GROUP_ID);
-        sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
+    public static final String UNKNOWN_MEMBER_ID = "";
 
-        if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME))
-            // rebalance timeout is added in v1
-            rebalanceTimeout = struct.getInt(REBALANCE_TIMEOUT_KEY_NAME);
-        else
-            // v0 had no rebalance timeout but used session timeout implicitly
-            rebalanceTimeout = sessionTimeout;
+    public JoinGroupRequest(JoinGroupRequestData data, short version) {
+        super(ApiKeys.JOIN_GROUP, version);
+        this.data = data;
+        this.version = version;
+    }
 
-        memberId = struct.get(MEMBER_ID);
-        protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME);
+    public JoinGroupRequest(Struct struct, short version) {
+        super(ApiKeys.JOIN_GROUP, version);
+        this.data = new JoinGroupRequestData(struct, version);
+        this.version = version;
+    }
 
-        groupProtocols = new ArrayList<>();
-        for (Object groupProtocolObj : struct.getArray(GROUP_PROTOCOLS_KEY_NAME)) {
-            Struct groupProtocolStruct = (Struct) groupProtocolObj;
-            String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME);
-            ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME);
-            groupProtocols.add(new ProtocolMetadata(name, metadata));
-        }
+    public JoinGroupRequestData data() {
+        return data;
     }
 
     @Override
@@ -207,77 +75,39 @@ public class JoinGroupRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new JoinGroupResponse(
-                    Errors.forException(e),
-                    JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                    JoinGroupResponse.UNKNOWN_PROTOCOL,
-                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-                    Collections.emptyMap());
+                        new JoinGroupResponseData()
+                                .setErrorCode(Errors.forException(e).code())
+                                .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+                                .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+                                .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+                                .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+                                .setMembers(Collections.emptyList())
+                );
             case 2:
             case 3:
             case 4:
                 return new JoinGroupResponse(
-                    throttleTimeMs,
-                    Errors.forException(e),
-                    JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                    JoinGroupResponse.UNKNOWN_PROTOCOL,
-                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-                    Collections.emptyMap());
-
+                        new JoinGroupResponseData()
+                                .setThrottleTimeMs(throttleTimeMs)
+                                .setErrorCode(Errors.forException(e).code())
+                                .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+                                .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+                                .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+                                .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+                                .setMembers(Collections.emptyList())
+                );
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.JOIN_GROUP.latestVersion()));
         }
     }
 
-    public String groupId() {
-        return groupId;
-    }
-
-    public int sessionTimeout() {
-        return sessionTimeout;
-    }
-
-    public int rebalanceTimeout() {
-        return rebalanceTimeout;
-    }
-
-    public String memberId() {
-        return memberId;
-    }
-
-    public List<ProtocolMetadata> groupProtocols() {
-        return groupProtocols;
-    }
-
-    public String protocolType() {
-        return protocolType;
-    }
-
     public static JoinGroupRequest parse(ByteBuffer buffer, short version) {
         return new JoinGroupRequest(ApiKeys.JOIN_GROUP.parseRequest(version, buffer), version);
     }
 
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.JOIN_GROUP.requestSchema(version));
-        struct.set(GROUP_ID, groupId);
-        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-        if (version >= 1) {
-            struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout);
-        }
-        struct.set(MEMBER_ID, memberId);
-        struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType);
-        List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size());
-        for (ProtocolMetadata protocol : groupProtocols) {
-            Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME);
-            protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name);
-            protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata);
-            groupProtocolsList.add(protocolStruct);
-        }
-        struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 55cb97f..0e1644e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -16,217 +16,70 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
 public class JoinGroupResponse extends AbstractResponse {
 
-    private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol";
-    private static final String LEADER_ID_KEY_NAME = "leader_id";
-    private static final String MEMBERS_KEY_NAME = "members";
-
-    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
-
-    private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(
-            MEMBER_ID,
-            new Field(MEMBER_METADATA_KEY_NAME, BYTES));
-
-    private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            GENERATION_ID,
-            new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
-            new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
-            MEMBER_ID,
-            new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
-    private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0;
-
-    private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            GENERATION_ID,
-            new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"),
-            new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"),
-            MEMBER_ID,
-            new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0)));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema JOIN_GROUP_RESPONSE_V3 = JOIN_GROUP_RESPONSE_V2;
-
-    /**
-     * The version number is bumped to indicate that client needs to issue a second join group request under first try
-     * with UNKNOWN_MEMBER_ID.
-     */
-    private static final Schema JOIN_GROUP_RESPONSE_V4 = JOIN_GROUP_RESPONSE_V3;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2,
-            JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
-    }
+    private final JoinGroupResponseData data;
 
     public static final String UNKNOWN_PROTOCOL = "";
     public static final int UNKNOWN_GENERATION_ID = -1;
     public static final String UNKNOWN_MEMBER_ID = "";
 
-    /**
-     * Possible error codes:
-     *
-     * COORDINATOR_LOAD_IN_PROGRESS (14)
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR (16)
-     * INCONSISTENT_GROUP_PROTOCOL (23)
-     * UNKNOWN_MEMBER_ID (25)
-     * INVALID_SESSION_TIMEOUT (26)
-     * GROUP_AUTHORIZATION_FAILED (30)
-     * MEMBER_ID_REQUIRED (79)
-     */
-
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final int generationId;
-    private final String groupProtocol;
-    private final String memberId;
-    private final String leaderId;
-    private final Map<String, ByteBuffer> members;
-
-    public JoinGroupResponse(Errors error,
-                             int generationId,
-                             String groupProtocol,
-                             String memberId,
-                             String leaderId,
-                             Map<String, ByteBuffer> groupMembers) {
-        this(DEFAULT_THROTTLE_TIME, error, generationId, groupProtocol, memberId, leaderId, groupMembers);
-    }
-
-    public JoinGroupResponse(int throttleTimeMs,
-            Errors error,
-            int generationId,
-            String groupProtocol,
-            String memberId,
-            String leaderId,
-            Map<String, ByteBuffer> groupMembers) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.generationId = generationId;
-        this.groupProtocol = groupProtocol;
-        this.memberId = memberId;
-        this.leaderId = leaderId;
-        this.members = groupMembers;
+    public JoinGroupResponse(JoinGroupResponseData data) {
+        this.data = data;
     }
 
     public JoinGroupResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        members = new HashMap<>();
-
-        for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) {
-            Struct memberData = (Struct) memberDataObj;
-            String memberId = memberData.get(MEMBER_ID);
-            ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME);
-            members.put(memberId, memberMetadata);
-        }
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        generationId = struct.get(GENERATION_ID);
-        groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME);
-        memberId = struct.get(MEMBER_ID);
-        leaderId = struct.getString(LEADER_ID_KEY_NAME);
-    }
-
-    @Override
-    public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public Errors error() {
-        return error;
+        short latestVersion = (short) (JoinGroupResponseData.SCHEMAS.length - 1);
+        this.data = new JoinGroupResponseData(struct, latestVersion);
     }
 
-    @Override
-    public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public int generationId() {
-        return generationId;
+    public JoinGroupResponse(Struct struct, short version) {
+        this.data = new JoinGroupResponseData(struct, version);
     }
 
-    public String groupProtocol() {
-        return groupProtocol;
+    public JoinGroupResponseData data() {
+        return data;
     }
 
-    public String memberId() {
-        return memberId;
+    public boolean isLeader() {
+        return data.memberId().equals(data.leader());
     }
 
-    public String leaderId() {
-        return leaderId;
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
     }
 
-    public boolean isLeader() {
-        return memberId.equals(leaderId);
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
     }
 
-    public Map<String, ByteBuffer> members() {
-        return members;
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
     }
 
-    public static JoinGroupResponse parse(ByteBuffer buffer, short version) {
-        return new JoinGroupResponse(ApiKeys.JOIN_GROUP.parseResponse(version, buffer));
+    public static JoinGroupResponse parse(ByteBuffer buffer, short versionId) {
+        return new JoinGroupResponse(ApiKeys.JOIN_GROUP.parseResponse(versionId, buffer), versionId);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.JOIN_GROUP.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-
-        struct.set(ERROR_CODE, error.code());
-        struct.set(GENERATION_ID, generationId);
-        struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol);
-        struct.set(MEMBER_ID, memberId);
-        struct.set(LEADER_ID_KEY_NAME, leaderId);
-
-        List<Struct> memberArray = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) {
-            Struct memberData = struct.instance(MEMBERS_KEY_NAME);
-            memberData.set(MEMBER_ID, entries.getKey());
-            memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue());
-            memberArray.add(memberData);
-        }
-        struct.set(MEMBERS_KEY_NAME, memberArray.toArray());
-
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public String toString() {
-        return "JoinGroupResponse" +
-            "(throttleTimeMs=" + throttleTimeMs +
-            ", error=" + error +
-            ", generationId=" + generationId +
-            ", groupProtocol=" + groupProtocol +
-            ", memberId=" + memberId +
-            ", leaderId=" + leaderId +
-            ", members=" + ((members == null) ? "null" :
-                Utils.join(members.keySet(), ",")) + ")";
+        return data.toString();
     }
 
     @Override
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json b/clients/src/main/resources/common/message/JoinGroupResponse.json
index fb36bb0..a1c6a70 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -28,7 +28,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "GenerationId", "type": "int32", "versions": "0+",
+    { "name": "GenerationId", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The generation ID of the group." },
     { "name": "ProtocolName", "type": "string", "versions": "0+",
       "about": "The group protocol selected by the coordinator." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index cd0a76f..e17a6ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
@@ -89,6 +91,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -1420,8 +1423,20 @@ public class KafkaConsumerTest {
         final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
 
         // This member becomes the leader
-        final JoinGroupResponse leaderResponse = new JoinGroupResponse(Errors.NONE, 1, assignor.name(), "memberId", "memberId",
-                Collections.singletonMap("memberId", byteBuffer));
+        String memberId = "memberId";
+        final JoinGroupResponse leaderResponse = new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGenerationId(1).setProtocolName(assignor.name())
+                        .setLeader(memberId).setMemberId(memberId)
+                        .setMembers(Collections.singletonList(
+                                new JoinGroupResponseData.JoinGroupResponseMember()
+                                        .setMemberId(memberId)
+                                        .setMetadata(byteBuffer.array())
+                                )
+                        )
+        );
+
         client.prepareResponseFrom(leaderResponse, coordinator);
 
         // sync group fails due to disconnect
@@ -1635,7 +1650,12 @@ public class KafkaConsumerTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
+                Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+                        joinGroupRequest.data().protocols().iterator();
+                assertTrue(protocolIterator.hasNext());
+
+                ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@@ -1707,8 +1727,15 @@ public class KafkaConsumerTest {
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
-        return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
-                Collections.emptyMap());
+        return new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setGenerationId(generationId)
+                        .setProtocolName(assignor.name())
+                        .setLeader(leaderId)
+                        .setMemberId(memberId)
+                        .setMembers(Collections.emptyList())
+        );
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 9a68db7..dc8ea46 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -215,7 +217,7 @@ public class AbstractCoordinatorTest {
                     return false;
                 }
                 JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
-                if (!joinGroupRequest.memberId().equals(memberId)) {
+                if (!joinGroupRequest.data().memberId().equals(memberId)) {
                     return false;
                 }
                 return true;
@@ -691,8 +693,15 @@ public class AbstractCoordinatorTest {
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
-        return new JoinGroupResponse(error, generationId, "dummy-subprotocol", memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap());
+        return new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setGenerationId(generationId)
+                        .setProtocolName("dummy-subprotocol")
+                        .setMemberId(memberId)
+                        .setLeader(leaderId)
+                        .setMembers(Collections.emptyList())
+        );
     }
 
     private JoinGroupResponse joinGroupResponse(Errors error) {
@@ -725,15 +734,22 @@ public class AbstractCoordinatorTest {
         }
 
         @Override
-        protected List<JoinGroupRequest.ProtocolMetadata> metadata() {
-            return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata("dummy-subprotocol", EMPTY_DATA));
+        protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+            return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                    Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
+                            .setName("dummy-subprotocol")
+                            .setMetadata(EMPTY_DATA.array())).iterator()
+            );
         }
 
         @Override
-        protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+        protected Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                            String protocol,
+                                                            List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
             Map<String, ByteBuffer> assignment = new HashMap<>();
-            for (Map.Entry<String, ByteBuffer> metadata : allMemberMetadata.entrySet())
-                assignment.put(metadata.getKey(), EMPTY_DATA);
+            for (JoinGroupResponseData.JoinGroupResponseMember member : allMemberMetadata) {
+                assignment.put(member.memberId(), EMPTY_DATA);
+            }
             return assignment;
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b079963..49bbcec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -23,9 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
-import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -39,6 +37,8 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -46,7 +46,6 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
@@ -71,6 +70,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -584,9 +584,14 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 JoinGroupRequest join = (JoinGroupRequest) body;
-                ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
-                protocolMetadata.metadata().rewind();
+                Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+                        join.data().protocols().iterator();
+                assertTrue(protocolIterator.hasNext());
+                JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
+
+                ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
+                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+                metadata.rewind();
                 return subscription.topics().containsAll(updatedSubscriptionSet);
             }
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
@@ -895,7 +900,7 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 JoinGroupRequest joinRequest = (JoinGroupRequest) body;
-                return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+                return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -947,7 +952,7 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 JoinGroupRequest joinRequest = (JoinGroupRequest) body;
-                return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
+                return joinRequest.data().memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -1479,7 +1484,8 @@ public class ConsumerCoordinatorTest {
         joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
 
         // now switch to manual assignment
-        client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+        client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
+                .setErrorCode(Errors.NONE.code())));
         subscriptions.unsubscribe();
         coordinator.maybeLeaveGroup();
         subscriptions.assignFromUser(singleton(t1p));
@@ -1851,30 +1857,6 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testProtocolMetadataOrder() {
-        RoundRobinAssignor roundRobin = new RoundRobinAssignor();
-        RangeAssignor range = new RangeAssignor();
-
-        try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
-                    false, true);
-            List<ProtocolMetadata> metadata = coordinator.metadata();
-            assertEquals(2, metadata.size());
-            assertEquals(roundRobin.name(), metadata.get(0).name());
-            assertEquals(range.name(), metadata.get(1).name());
-        }
-
-        try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
-                    false, true);
-            List<ProtocolMetadata> metadata = coordinator.metadata();
-            assertEquals(2, metadata.size());
-            assertEquals(range.name(), metadata.get(0).name());
-            assertEquals(roundRobin.name(), metadata.get(1).name());
-        }
-    }
-
-    @Test
     public void testThreadSafeAssignedPartitionsMetric() throws Exception {
         // Get the assigned-partitions metric
         final Metric metric = metrics.metric(new MetricName("assigned-partitions", "consumer" + groupId + "-coordinator-metrics",
@@ -2131,7 +2113,8 @@ public class ConsumerCoordinatorTest {
                 LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
                 return leaveRequest.data().groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
+        }, new LeaveGroupResponse(new LeaveGroupResponseData()
+                .setErrorCode(Errors.NONE.code())));
 
         coordinator.close();
         assertTrue("Commit not requested", commitRequested.get());
@@ -2174,18 +2157,36 @@ public class ConsumerCoordinatorTest {
                                                       String memberId,
                                                       Map<String, List<String>> subscriptions,
                                                       Errors error) {
-        Map<String, ByteBuffer> metadata = new HashMap<>();
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
             PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
             ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
-            metadata.put(subscriptionEntry.getKey(), buf);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                    .setMemberId(subscriptionEntry.getKey())
+                    .setMetadata(buf.array()));
         }
-        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata);
+
+        return new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setGenerationId(generationId)
+                        .setProtocolName(partitionAssignor.name())
+                        .setLeader(memberId)
+                        .setMemberId(memberId)
+                        .setMembers(Collections.emptyList())
+        );
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
-        return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
-                Collections.emptyMap());
+        return new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setGenerationId(generationId)
+                        .setProtocolName(partitionAssignor.name())
+                        .setLeader(leaderId)
+                        .setMemberId(memberId)
+                        .setMembers(Collections.emptyList())
+        );
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a483500..5690743 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -45,6 +45,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@@ -648,7 +650,7 @@ public class RequestResponseTest {
         final short version = 0;
         JoinGroupRequest jgr = createJoinGroupRequest(version);
         JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
-        assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
+        assertEquals(jgr2.data().rebalanceTimeoutMs(), jgr.data().rebalanceTimeoutMs());
     }
 
     @Test
@@ -742,23 +744,53 @@ public class RequestResponseTest {
     }
 
     private JoinGroupRequest createJoinGroupRequest(int version) {
-        ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
-        List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
-        protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
+        JoinGroupRequestData.JoinGroupRequestProtocolSet protocols = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                Collections.singleton(
+                new JoinGroupRequestData.JoinGroupRequestProtocol()
+                        .setName("consumer-range")
+                        .setMetadata(new byte[0])).iterator()
+        );
         if (version == 0) {
-            return new JoinGroupRequest.Builder("group1", 30000, "consumer1", "consumer", protocols).
-                    build((short) version);
+            return new JoinGroupRequest.Builder(
+                    new JoinGroupRequestData()
+                            .setGroupId("group1")
+                            .setSessionTimeoutMs(30000)
+                            .setMemberId("consumer1")
+                            .setProtocolType("consumer")
+                            .setProtocols(protocols))
+                    .build((short) version);
         } else {
-            return new JoinGroupRequest.Builder("group1", 10000, "consumer1", "consumer", protocols).
-                    setRebalanceTimeout(60000).build();
+            return new JoinGroupRequest.Builder(
+                    new JoinGroupRequestData()
+                            .setGroupId("group1")
+                            .setSessionTimeoutMs(30000)
+                            .setMemberId("consumer1")
+                            .setProtocolType("consumer")
+                            .setProtocols(protocols)
+                            .setRebalanceTimeoutMs(60000)) // v1 and above contains rebalance timeout
+                    .build((short) version);
         }
     }
 
     private JoinGroupResponse createJoinGroupResponse() {
-        Map<String, ByteBuffer> members = new HashMap<>();
-        members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
-        members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
-        return new JoinGroupResponse(Errors.NONE, 1, "range", "consumer1", "leader", members);
+        List<JoinGroupResponseData.JoinGroupResponseMember> members = Arrays.asList(
+                new JoinGroupResponseData.JoinGroupResponseMember()
+                        .setMemberId("consumer1")
+                        .setMetadata(new byte[0]),
+                new JoinGroupResponseData.JoinGroupResponseMember()
+                        .setMemberId("consumer2")
+                        .setMetadata(new byte[0])
+        );
+
+        return new JoinGroupResponse(
+                new JoinGroupResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGenerationId(1)
+                        .setProtocolName("range")
+                        .setLeader("leader")
+                        .setMemberId("consumer1")
+                        .setMembers(members)
+        );
     }
 
     private ListGroupsRequest createListGroupsRequest() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 103a323..fa89a9d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,11 +18,12 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -144,11 +145,16 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public List<ProtocolMetadata> metadata() {
+    public JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
         configSnapshot = configStorage.snapshot();
         ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
         ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
-        return Collections.singletonList(new ProtocolMetadata(DEFAULT_SUBPROTOCOL, metadata));
+        return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
+                        .setName(DEFAULT_SUBPROTOCOL)
+                        .setMetadata(metadata.array()))
+                        .iterator()
+        );
     }
 
     @Override
@@ -163,12 +169,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
         log.debug("Performing task assignment");
 
         Map<String, ConnectProtocol.WorkerState> memberConfigs = new HashMap<>();
-        for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet())
-            memberConfigs.put(entry.getKey(), ConnectProtocol.deserializeMetadata(entry.getValue()));
+        for (JoinGroupResponseData.JoinGroupResponseMember memberMetadata : allMemberMetadata)
+            memberConfigs.put(memberMetadata.memberId(), ConnectProtocol.deserializeMetadata(ByteBuffer.wrap(memberMetadata.metadata())));
 
         long maxOffset = findMaxMemberConfigOffset(memberConfigs);
         Long leaderOffset = ensureLeaderConfig(maxOffset);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 68d236f..c78c7a4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -21,11 +21,12 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -44,15 +45,18 @@ import org.powermock.api.easymock.PowerMock;
 import org.powermock.reflect.Whitebox;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class WorkerCoordinatorTest {
 
@@ -186,12 +190,15 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        List<ProtocolMetadata> serialized = coordinator.metadata();
+        JoinGroupRequestData.JoinGroupRequestProtocolSet serialized = coordinator.metadata();
         assertEquals(1, serialized.size());
 
-        ProtocolMetadata defaultMetadata = serialized.get(0);
+        Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
+        assertTrue(protocolIterator.hasNext());
+        JoinGroupRequestData.JoinGroupRequestProtocol defaultMetadata = protocolIterator.next();
         assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
-        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(defaultMetadata.metadata());
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(
+                ByteBuffer.wrap(defaultMetadata.metadata()));
         assertEquals(1, state.offset());
 
         PowerMock.verifyAll();
@@ -364,11 +371,17 @@ public class WorkerCoordinatorTest {
         // Prime the current configuration state
         coordinator.metadata();
 
-        Map<String, ByteBuffer> configs = new HashMap<>();
         // Mark everyone as in sync with configState1
-        configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
-        configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+        List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("leader")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+        );
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("member")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+        );
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // configState1 has 1 connector, 1 task
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -400,11 +413,18 @@ public class WorkerCoordinatorTest {
         // Prime the current configuration state
         coordinator.metadata();
 
-        Map<String, ByteBuffer> configs = new HashMap<>();
         // Mark everyone as in sync with configState1
-        configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
-        configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+        List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("leader")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+        );
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("member")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+        );
+
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -436,11 +456,18 @@ public class WorkerCoordinatorTest {
         // Prime the current configuration state
         coordinator.metadata();
 
-        Map<String, ByteBuffer> configs = new HashMap<>();
         // Mark everyone as in sync with configState1
-        configs.put("leader", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)));
-        configs.put("member", ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+        List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("leader")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+        );
+        responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId("member")
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+        );
+
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // Round robin assignment when there are the same number of connectors and tasks should result in each being
         // evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks
@@ -468,20 +495,36 @@ public class WorkerCoordinatorTest {
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
                                            Map<String, Long> configOffsets, Errors error) {
-        Map<String, ByteBuffer> metadata = new HashMap<>();
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
         for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
             // We need a member URL, but it doesn't matter for the purposes of this test. Just set it to the member ID
             String memberUrl = configStateEntry.getKey();
             long configOffset = configStateEntry.getValue();
             ByteBuffer buf = ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(memberUrl, configOffset));
-            metadata.put(configStateEntry.getKey(), buf);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                    .setMemberId(configStateEntry.getKey())
+                    .setMetadata(buf.array())
+            );
         }
-        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata);
+        return new JoinGroupResponse(
+                new JoinGroupResponseData().setErrorCode(error.code())
+                .setGenerationId(generationId)
+                .setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
+                .setLeader(memberId)
+                .setMemberId(memberId)
+                .setMembers(metadata)
+        );
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
-        return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap());
+        return new JoinGroupResponse(
+                new JoinGroupResponseData().setErrorCode(error.code())
+                        .setGenerationId(generationId)
+                        .setProtocolName(WorkerCoordinator.DEFAULT_SUBPROTOCOL)
+                        .setLeader(leaderId)
+                        .setMemberId(memberId)
+                        .setMembers(Collections.emptyList())
+        );
     }
 
     private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b73341..68d0823 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -45,9 +45,11 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroupsResponseData}
+import org.apache.kafka.common.message.CreateTopicsResponseData
 import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
+import org.apache.kafka.common.message.DescribeGroupsResponseData
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
+import org.apache.kafka.common.message.JoinGroupResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
 import org.apache.kafka.common.message.SaslAuthenticateResponseData
 import org.apache.kafka.common.message.SaslHandshakeResponseData
@@ -1282,10 +1284,23 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
-      val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
+      val members = joinResult.members map { case (memberId, metadataArray) =>
+        new JoinGroupResponseData.JoinGroupResponseMember()
+          .setMemberId(memberId)
+          .setMetadata(metadataArray)
+      }
+
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
-          joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
+        val responseBody = new JoinGroupResponse(
+          new JoinGroupResponseData()
+            .setThrottleTimeMs(requestThrottleMs)
+            .setErrorCode(joinResult.error.code())
+            .setGenerationId(joinResult.generationId)
+            .setProtocolName(joinResult.subProtocol)
+            .setLeader(joinResult.leaderId)
+            .setMemberId(joinResult.memberId)
+            .setMembers(members.toSeq.asJava)
+        )
 
         trace("Sending join group response %s for correlation id %d to client %s."
           .format(responseBody, request.header.correlationId, request.header.clientId))
@@ -1294,33 +1309,35 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
+    if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.data().groupId(), LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new JoinGroupResponse(
-          requestThrottleMs,
-          Errors.GROUP_AUTHORIZATION_FAILED,
-          JoinGroupResponse.UNKNOWN_GENERATION_ID,
-          JoinGroupResponse.UNKNOWN_PROTOCOL,
-          JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-          JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-          Collections.emptyMap())
+          new JoinGroupResponseData()
+            .setThrottleTimeMs(requestThrottleMs)
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
+            .setGenerationId(JoinGroupResponse.UNKNOWN_GENERATION_ID)
+            .setProtocolName(JoinGroupResponse.UNKNOWN_PROTOCOL)
+            .setLeader(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+            .setMemberId(JoinGroupResponse.UNKNOWN_MEMBER_ID)
+            .setMembers(Collections.emptyList())
+        )
       )
     } else {
       // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
       val requireKnownMemberId = joinGroupRequest.version >= 4
 
       // let the coordinator handle join-group
-      val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
-        (protocol.name, Utils.toArray(protocol.metadata))).toList
+      val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
+        (protocol.name, protocol.metadata)).toList
       groupCoordinator.handleJoinGroup(
-        joinGroupRequest.groupId,
-        joinGroupRequest.memberId,
+        joinGroupRequest.data().groupId,
+        joinGroupRequest.data().memberId,
         requireKnownMemberId,
         request.header.clientId,
         request.session.clientAddress.toString,
-        joinGroupRequest.rebalanceTimeout,
-        joinGroupRequest.sessionTimeout,
-        joinGroupRequest.protocolType,
+        joinGroupRequest.data().rebalanceTimeoutMs,
+        joinGroupRequest.data().sessionTimeoutMs,
+        joinGroupRequest.data().protocolType,
         protocols,
         sendResponseCallback)
     }
@@ -1396,9 +1413,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(error: Errors) {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val response = new LeaveGroupResponse(new LeaveGroupResponseData()
-          .setThrottleTimeMs(requestThrottleMs).setErrorCode(error.code()))
+          .setThrottleTimeMs(requestThrottleMs)
+          .setErrorCode(error.code()))
         trace("Sending leave group response %s for correlation id %d to client %s."
-                    .format(response, request.header.correlationId, request.header.clientId))
+          .format(response, request.header.correlationId, request.header.clientId))
         response
       }
       sendResponseMaybeThrottle(request, createResponse)
@@ -1407,7 +1425,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, Read, Resource(Group, leaveGroupRequest.data().groupId(), LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         new LeaveGroupResponse(new LeaveGroupResponseData()
-          .setThrottleTimeMs(requestThrottleMs).setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
+          .setThrottleTimeMs(requestThrottleMs)
+          .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())))
     } else {
       // let the coordinator to handle leave-group
       groupCoordinator.handleLeaveGroup(
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 044f595..30cc161 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -310,9 +310,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createJoinGroupRequest = {
-    new JoinGroupRequest.Builder(group, 10000, "", "consumer",
-      List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
-      .setRebalanceTimeout(60000).build()
+    val protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+      Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
+        .setName("consumer-range")
+        .setMetadata("test".getBytes())
+    ).iterator())
+
+    new JoinGroupRequest.Builder(
+      new JoinGroupRequestData()
+        .setGroupId(group)
+        .setSessionTimeoutMs(10000)
+        .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+        .setProtocolType("consumer")
+        .setProtocols(protocolSet)
+        .setRebalanceTimeoutMs(60000)
+    ).build()
   }
 
   private def createSyncGroupRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d070e46..32541d3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,7 +24,7 @@ import kafka.security.auth._
 import kafka.utils.TestUtils
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
@@ -254,9 +254,21 @@ class RequestQuotaTest extends BaseRequestTest {
           new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
 
         case ApiKeys.JOIN_GROUP =>
-          new JoinGroupRequest.Builder("test-join-group", 200, "", "consumer",
-            List(new JoinGroupRequest.ProtocolMetadata("consumer-range", ByteBuffer.wrap("test".getBytes()))).asJava)
-           .setRebalanceTimeout(100)
+          new JoinGroupRequest.Builder(
+            new JoinGroupRequestData()
+              .setGroupId("test-join-group")
+              .setSessionTimeoutMs(200)
+              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+              .setProtocolType("consumer")
+              .setProtocols(
+                new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                  Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
+                    .setName("consumer-range")
+                    .setMetadata("test".getBytes())).iterator()
+                )
+              )
+              .setRebalanceTimeoutMs(100)
+          )
 
         case ApiKeys.HEARTBEAT =>
           new HeartbeatRequest.Builder("test-group", 1, "")