You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/15 22:09:14 UTC

[kafka] branch trunk updated: KAFKA-8354; Replace Sync group request/response with automated protocol (#6729)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2208f99  KAFKA-8354; Replace Sync group request/response with automated protocol (#6729)
2208f99 is described below

commit 2208f9966d31a3f52bf9b63938bd6f3729378d9b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed May 15 15:09:00 2019 -0700

    KAFKA-8354; Replace Sync group request/response with automated protocol (#6729)
    
    Update SyncGroup API to use the generated protocol classes.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  30 ++++-
 .../consumer/internals/ConsumerCoordinator.java    |   2 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  10 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../kafka/common/requests/SyncGroupRequest.java    | 139 +++++----------------
 .../kafka/common/requests/SyncGroupResponse.java   |  79 +++---------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   7 +-
 .../internals/AbstractCoordinatorTest.java         |   7 +-
 .../internals/ConsumerCoordinatorTest.java         |  62 ++++-----
 .../runtime/distributed/WorkerCoordinatorTest.java |  26 ++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  22 +++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  10 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 13 files changed, 169 insertions(+), 235 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index ebe8231..cbfd76e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -58,10 +59,12 @@ import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -589,8 +592,13 @@ public abstract class AbstractCoordinator implements Closeable {
     private RequestFuture<ByteBuffer> onJoinFollower() {
         // send follower's sync group with an empty assignment
         SyncGroupRequest.Builder requestBuilder =
-                new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
-                        Collections.<String, ByteBuffer>emptyMap());
+                new SyncGroupRequest.Builder(
+                        new SyncGroupRequestData()
+                                .setGroupId(groupId)
+                                .setMemberId(generation.memberId)
+                                .setGenerationId(generation.generationId)
+                                .setAssignments(Collections.emptyList())
+                );
         log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
         return sendSyncGroupRequest(requestBuilder);
     }
@@ -601,8 +609,22 @@ public abstract class AbstractCoordinator implements Closeable {
             Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                     joinResponse.data().members());
 
+            List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
+            for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
+                groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
+                        .setMemberId(assignment.getKey())
+                        .setAssignment(Utils.toArray(assignment.getValue()))
+                );
+            }
+
             SyncGroupRequest.Builder requestBuilder =
-                    new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
+                    new SyncGroupRequest.Builder(
+                            new SyncGroupRequestData()
+                                    .setGroupId(groupId)
+                                    .setMemberId(generation.memberId)
+                                    .setGenerationId(generation.generationId)
+                                    .setAssignments(groupAssignmentList)
+                    );
             log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
             return sendSyncGroupRequest(requestBuilder);
         } catch (RuntimeException e) {
@@ -624,7 +646,7 @@ public abstract class AbstractCoordinator implements Closeable {
             Errors error = syncResponse.error();
             if (error == Errors.NONE) {
                 sensors.syncLatency.record(response.requestLatencyMs());
-                future.complete(syncResponse.memberAssignment());
+                future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
             } else {
                 requestRejoin();
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 95cff78..6833579 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -182,7 +182,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
             protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
                     .setName(assignor.name())
-                    .setMetadata(metadata.array()));
+                    .setMetadata(Utils.toArray(metadata)));
         }
         return protocolSet;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3e2a87a..a7e3757 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -42,8 +44,8 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -103,8 +105,6 @@ import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.StopReplicaRequest;
 import org.apache.kafka.common.requests.StopReplicaResponse;
-import org.apache.kafka.common.requests.SyncGroupRequest;
-import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
 import org.apache.kafka.common.requests.UpdateMetadataRequest;
@@ -140,7 +140,7 @@ public enum ApiKeys {
     JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
     HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
     LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
-    SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()),
+    SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
     DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
             DescribeGroupsResponseData.SCHEMAS),
     LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7ba9b7a..a7f5a38 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -91,7 +91,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case LEAVE_GROUP:
                 return new LeaveGroupResponse(struct, version);
             case SYNC_GROUP:
-                return new SyncGroupResponse(struct);
+                return new SyncGroupResponse(struct, version);
             case STOP_REPLICA:
                 return new StopReplicaResponse(struct);
             case CONTROLLED_SHUTDOWN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 237320f..37a2451 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -16,112 +16,48 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-
 public class SyncGroupRequest extends AbstractRequest {
-    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-    private static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment";
-
-    private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(
-            MEMBER_ID,
-            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
-    private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID,
-            new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0)));
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema SYNC_GROUP_REQUEST_V2 = SYNC_GROUP_REQUEST_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1,
-            SYNC_GROUP_REQUEST_V2};
-    }
 
     public static class Builder extends AbstractRequest.Builder<SyncGroupRequest> {
-        private final String groupId;
-        private final int generationId;
-        private final String memberId;
-        private final Map<String, ByteBuffer> groupAssignment;
 
-        public Builder(String groupId, int generationId, String memberId,
-                       Map<String, ByteBuffer> groupAssignment) {
+        private final SyncGroupRequestData data;
+
+        public Builder(SyncGroupRequestData data) {
             super(ApiKeys.SYNC_GROUP);
-            this.groupId = groupId;
-            this.generationId = generationId;
-            this.memberId = memberId;
-            this.groupAssignment = groupAssignment;
+            this.data = data;
         }
 
         @Override
         public SyncGroupRequest build(short version) {
-            return new SyncGroupRequest(groupId, generationId, memberId, groupAssignment, version);
+            return new SyncGroupRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=SyncGroupRequest").
-                    append(", groupId=").append(groupId).
-                    append(", generationId=").append(generationId).
-                    append(", memberId=").append(memberId).
-                    append(", groupAssignment=").
-                    append(Utils.join(groupAssignment.keySet(), ",")).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
-    private final String groupId;
-    private final int generationId;
-    private final String memberId;
-    private final Map<String, ByteBuffer> groupAssignment;
 
-    private SyncGroupRequest(String groupId, int generationId, String memberId,
-                             Map<String, ByteBuffer> groupAssignment, short version) {
+    public final SyncGroupRequestData data;
+
+    public SyncGroupRequest(SyncGroupRequestData data, short version) {
         super(ApiKeys.SYNC_GROUP, version);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.memberId = memberId;
-        this.groupAssignment = groupAssignment;
+        this.data = data;
     }
 
     public SyncGroupRequest(Struct struct, short version) {
         super(ApiKeys.SYNC_GROUP, version);
-        this.groupId = struct.get(GROUP_ID);
-        this.generationId = struct.get(GENERATION_ID);
-        this.memberId = struct.get(MEMBER_ID);
-
-        groupAssignment = new HashMap<>();
-
-        for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) {
-            Struct memberData = (Struct) memberDataObj;
-            String memberId = memberData.get(MEMBER_ID);
-            ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
-            groupAssignment.put(memberId, memberMetadata);
-        }
+        this.data = new SyncGroupRequestData(struct, version);
     }
 
     @Override
@@ -130,34 +66,30 @@ public class SyncGroupRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
                 return new SyncGroupResponse(
-                        Errors.forException(e),
-                        ByteBuffer.wrap(new byte[]{}));
+                        new SyncGroupResponseData()
+                            .setErrorCode(Errors.forException(e).code())
+                            .setAssignment(new byte[0])
+                       );
             case 1:
             case 2:
                 return new SyncGroupResponse(
-                        throttleTimeMs,
-                        Errors.forException(e),
-                        ByteBuffer.wrap(new byte[]{}));
+                        new SyncGroupResponseData()
+                            .setErrorCode(Errors.forException(e).code())
+                            .setAssignment(new byte[0])
+                            .setThrottleTimeMs(throttleTimeMs)
+                );
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion()));
         }
     }
 
-    public String groupId() {
-        return groupId;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public Map<String, ByteBuffer> groupAssignment() {
-        return groupAssignment;
-    }
-
-    public String memberId() {
-        return memberId;
+    public Map<String, ByteBuffer> groupAssignments() {
+        Map<String, ByteBuffer> groupAssignments = new HashMap<>();
+        for (SyncGroupRequestData.SyncGroupRequestAssignment assignment : data.assignments()) {
+            groupAssignments.put(assignment.memberId(), ByteBuffer.wrap(assignment.assignment()));
+        }
+        return groupAssignments;
     }
 
     public static SyncGroupRequest parse(ByteBuffer buffer, short version) {
@@ -166,19 +98,6 @@ public class SyncGroupRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.SYNC_GROUP.requestSchema(version()));
-        struct.set(GROUP_ID, groupId);
-        struct.set(GENERATION_ID, generationId);
-        struct.set(MEMBER_ID, memberId);
-
-        List<Struct> memberArray = new ArrayList<>();
-        for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) {
-            Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME);
-            memberData.set(MEMBER_ID, entries.getKey());
-            memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue());
-            memberArray.add(memberData);
-        }
-        struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray());
-        return struct;
+        return data.toStruct(version());
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 2b2fc6f..8c9bad1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -16,100 +16,49 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-
 public class SyncGroupResponse extends AbstractResponse {
-    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
-
-    private static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
-    private static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema SYNC_GROUP_RESPONSE_V2 = SYNC_GROUP_RESPONSE_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1,
-            SYNC_GROUP_RESPONSE_V2};
-    }
 
-    /**
-     * Possible error codes:
-     *
-     * COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR (16)
-     * ILLEGAL_GENERATION (22)
-     * UNKNOWN_MEMBER_ID (25)
-     * REBALANCE_IN_PROGRESS (27)
-     * GROUP_AUTHORIZATION_FAILED (30)
-     *
-     * NOTE: Currently the coordinator returns REBALANCE_IN_PROGRESS while the coordinator is
-     * loading. On the next protocol bump, we should consider using COORDINATOR_LOAD_IN_PROGRESS
-     * to be consistent with the other APIs.
-     */
+    public final SyncGroupResponseData data;
 
-    private final Errors error;
-    private final int throttleTimeMs;
-    private final ByteBuffer memberState;
-
-    public SyncGroupResponse(Errors error, ByteBuffer memberState) {
-        this(DEFAULT_THROTTLE_TIME, error, memberState);
+    public SyncGroupResponse(SyncGroupResponseData data) {
+        this.data = data;
     }
 
-    public SyncGroupResponse(int throttleTimeMs, Errors error, ByteBuffer memberState) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.memberState = memberState;
+    public SyncGroupResponse(Struct struct) {
+        short latestVersion = (short) (SyncGroupResponseData.SCHEMAS.length - 1);
+        this.data = new SyncGroupResponseData(struct, latestVersion);
     }
 
-    public SyncGroupResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        this.error = Errors.forCode(struct.get(ERROR_CODE));
-        this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+    public SyncGroupResponse(Struct struct, short version) {
+        this.data = new SyncGroupResponseData(struct, version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public ByteBuffer memberAssignment() {
-        return memberState;
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(ERROR_CODE, error.code());
-        struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
-        return struct;
+        return data.toStruct(version);
     }
 
     public static SyncGroupResponse parse(ByteBuffer buffer, short version) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 3a4076b..09eb94a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
@@ -1763,7 +1764,11 @@ public class KafkaConsumerTest {
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
-        return new SyncGroupResponse(error, buf);
+        return new SyncGroupResponse(
+                new SyncGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setAssignment(Utils.toArray(buf))
+        );
     }
 
     private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 8d8afd2..449c58f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -793,7 +794,11 @@ public class AbstractCoordinatorTest {
     }
 
     private SyncGroupResponse syncGroupResponse(Errors error) {
-        return new SyncGroupResponse(error, ByteBuffer.allocate(0));
+        return new SyncGroupResponse(
+                new SyncGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setAssignment(new byte[0])
+        );
     }
 
     public static class DummyCoordinator extends AbstractCoordinator {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 4f6e0f1..d8b1252 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -416,9 +418,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -452,9 +454,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(Arrays.asList(t2p), Errors.NONE));
 
@@ -466,9 +468,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
@@ -508,9 +510,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(singletonList(t2p), Errors.NONE));
         assertThrows(IllegalStateException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE)));
@@ -538,9 +540,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
@@ -648,9 +650,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                    sync.generationId() == 1 &&
-                    sync.groupAssignment().isEmpty();
+                return sync.data.memberId().equals(consumerId) &&
+                    sync.data.generationId() == 1 &&
+                    sync.groupAssignments().isEmpty();
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
@@ -722,9 +724,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().isEmpty();
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
@@ -787,9 +789,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().isEmpty();
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
@@ -1038,9 +1040,9 @@ public class ConsumerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                if (sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId)) {
+                if (sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId)) {
                     // trigger the metadata update including both topics after the sync group request has been sent
                     Map<String, Integer> topicPartitionCounts = new HashMap<>();
                     topicPartitionCounts.put(topic1, 1);
@@ -2228,7 +2230,11 @@ public class ConsumerCoordinatorTest {
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
-        return new SyncGroupResponse(error, buf);
+        return new SyncGroupResponse(
+                new SyncGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setAssignment(Utils.toArray(buf))
+        );
     }
 
     private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3684025..9f980a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -224,9 +226,9 @@ public class WorkerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
+                return sync.data.memberId().equals(consumerId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.groupAssignments().containsKey(consumerId);
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
                 Collections.<ConnectorTaskId>emptyList(), Errors.NONE));
@@ -261,9 +263,9 @@ public class WorkerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(memberId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
+                return sync.data.memberId().equals(memberId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.data.assignments().isEmpty();
             }
         }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
@@ -302,9 +304,9 @@ public class WorkerCoordinatorTest {
             @Override
             public boolean matches(AbstractRequest body) {
                 SyncGroupRequest sync = (SyncGroupRequest) body;
-                return sync.memberId().equals(memberId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
+                return sync.data.memberId().equals(memberId) &&
+                        sync.data.generationId() == 1 &&
+                        sync.data.assignments().isEmpty();
             }
         };
         client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
@@ -526,7 +528,11 @@ public class WorkerCoordinatorTest {
                                      List<ConnectorTaskId> taskIds, Errors error) {
         ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
         ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
-        return new SyncGroupResponse(error, buf);
+        return new SyncGroupResponse(
+                new SyncGroupResponseData()
+                        .setErrorCode(error.code())
+                        .setAssignment(Utils.toArray(buf))
+        );
     }
 
     private static class MockRebalanceListener implements WorkerRebalanceListener {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 302718d..6bd7174 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1377,17 +1377,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     def sendResponseCallback(memberState: Array[Byte], error: Errors) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new SyncGroupResponse(requestThrottleMs, error, ByteBuffer.wrap(memberState)))
+        new SyncGroupResponse(
+          new SyncGroupResponseData()
+            .setErrorCode(error.code)
+            .setAssignment(memberState)
+            .setThrottleTimeMs(requestThrottleMs)
+        ))
     }
 
-    if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.groupId(), LITERAL))) {
+    if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.data.groupId, LITERAL))) {
       sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
+      val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
+      syncGroupRequest.data.assignments.asScala.foreach { assignment =>
+        assignmentMap += (assignment.memberId -> assignment.assignment)
+      }
+
       groupCoordinator.handleSyncGroup(
-        syncGroupRequest.groupId,
-        syncGroupRequest.generationId,
-        syncGroupRequest.memberId,
-        syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
+        syncGroupRequest.data.groupId,
+        syncGroupRequest.data.generationId,
+        syncGroupRequest.data.memberId,
+        assignmentMap.result,
         sendResponseCallback
       )
     }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ac8153c..aa6a0dd 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -166,7 +166,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
     ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
-    ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
+    ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => Errors.forCode(resp.data.errorCode())),
     ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => {
       val errorCode = resp.data().groups().asScala.find(g => group.equals(g.groupId())).head.errorCode()
       Errors.forCode(errorCode)
@@ -340,7 +340,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createSyncGroupRequest = {
-    new SyncGroupRequest.Builder(group, 1, "", Map[String, ByteBuffer]().asJava).build()
+    new SyncGroupRequest.Builder(
+      new SyncGroupRequestData()
+        .setGroupId(group)
+        .setGenerationId(1)
+        .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+        .setAssignments(Collections.emptyList())
+    ).build()
   }
 
   private def createDescribeGroupsRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 52ac700..eadcffb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -306,7 +306,13 @@ class RequestQuotaTest extends BaseRequestTest {
           new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("test-leave-group").setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
 
         case ApiKeys.SYNC_GROUP =>
-          new SyncGroupRequest.Builder("test-sync-group", 1, "", Map[String, ByteBuffer]().asJava)
+          new SyncGroupRequest.Builder(
+            new SyncGroupRequestData()
+              .setGroupId("test-sync-group")
+              .setGenerationId(1)
+              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+              .setAssignments(Collections.emptyList())
+          )
 
         case ApiKeys.DESCRIBE_GROUPS =>
           new DescribeGroupsRequest.Builder(new DescribeGroupsRequestData().setGroups(List("test-group").asJava))