You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/06 21:26:45 UTC

[kafka] branch trunk updated: KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 407bcdf  KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)
407bcdf is described below

commit 407bcdf78e06f83f2b358d2cbd96aed348a5c28f
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon May 6 22:26:22 2019 +0100

    KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  23 +++-
 .../consumer/internals/AbstractCoordinator.java    |  15 ++-
 .../producer/internals/TransactionManager.java     |  16 ++-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   8 +-
 .../kafka/common/requests/AbstractResponse.java    |   2 +-
 .../common/requests/FindCoordinatorRequest.java    | 121 ++++++---------------
 .../common/requests/FindCoordinatorResponse.java   | 114 +++++--------------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  16 ++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  23 ++--
 .../internals/AbstractCoordinatorTest.java         |   2 +-
 .../internals/ConsumerCoordinatorTest.java         |   2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  |   6 +-
 .../clients/producer/internals/SenderTest.java     |   6 +-
 .../producer/internals/TransactionManagerTest.java |  10 +-
 .../kafka/common/requests/RequestResponseTest.java |  15 ++-
 .../runtime/distributed/WorkerCoordinatorTest.java |  13 +--
 core/src/main/scala/kafka/admin/AdminClient.scala  |   9 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  35 ++++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |   5 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |   6 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 21 files changed, 198 insertions(+), 257 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ffe24ca..a0958e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -68,6 +68,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
@@ -103,6 +104,7 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -2538,8 +2540,11 @@ public class KafkaAdminClient extends AdminClient {
 
             runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
                 @Override
-                AbstractRequest.Builder createRequest(int timeoutMs) {
-                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+                FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+                    return new FindCoordinatorRequest.Builder(
+                            new FindCoordinatorRequestData()
+                                .setKeyType(CoordinatorType.GROUP.id())
+                                .setKey(groupId));
                 }
 
                 @Override
@@ -2781,8 +2786,11 @@ public class KafkaAdminClient extends AdminClient {
 
         runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
             @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+            FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+                return new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
+                            .setKeyType(CoordinatorType.GROUP.id())
+                            .setKey(groupId));
             }
 
             @Override
@@ -2872,8 +2880,11 @@ public class KafkaAdminClient extends AdminClient {
 
             runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
                 @Override
-                AbstractRequest.Builder createRequest(int timeoutMs) {
-                    return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+                FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+                    return new FindCoordinatorRequest.Builder(
+                            new FindCoordinatorRequestData()
+                                .setKeyType(CoordinatorType.GROUP.id())
+                                .setKey(groupId));
                 }
 
                 @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 0124338..2cf3910 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -661,7 +663,10 @@ public abstract class AbstractCoordinator implements Closeable {
         // initiate the group metadata request
         log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequest.Builder requestBuilder =
-                new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
+                new FindCoordinatorRequest.Builder(
+                        new FindCoordinatorRequestData()
+                            .setKeyType(CoordinatorType.GROUP.id())
+                            .setKey(this.groupId));
         return client.send(node, requestBuilder)
                 .compose(new FindCoordinatorResponseHandler());
     }
@@ -679,12 +684,12 @@ public abstract class AbstractCoordinator implements Closeable {
                 synchronized (AbstractCoordinator.this) {
                     // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                     // for the coordinator in the underlying network client layer
-                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
+                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
 
                     AbstractCoordinator.this.coordinator = new Node(
                             coordinatorConnectionId,
-                            findCoordinatorResponse.node().host(),
-                            findCoordinatorResponse.node().port());
+                            findCoordinatorResponse.data().host(),
+                            findCoordinatorResponse.data().port());
                     log.info("Discovered group coordinator {}", coordinator);
                     client.tryConnect(coordinator);
                     heartbeat.resetSessionTimeout();
@@ -693,7 +698,7 @@ public abstract class AbstractCoordinator implements Closeable {
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(new GroupAuthorizationException(groupId));
             } else {
-                log.debug("Group coordinator lookup failed: {}", error.message());
+                log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
                 future.raise(error);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 98fa153..e24d69b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.DefaultRecordBatch;
@@ -38,6 +39,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnRequest;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -866,7 +868,10 @@ public class TransactionManager {
                 throw new IllegalStateException("Invalid coordinator type: " + type);
         }
 
-        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
+        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
+                new FindCoordinatorRequestData()
+                    .setKeyType(type.id())
+                    .setKey(coordinatorKey));
         enqueueRequest(new FindCoordinatorHandler(builder));
     }
 
@@ -1193,10 +1198,11 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             Errors error = findCoordinatorResponse.error();
+            CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
 
             if (error == Errors.NONE) {
                 Node node = findCoordinatorResponse.node();
-                switch (builder.coordinatorType()) {
+                switch (coordinatorType) {
                     case GROUP:
                         consumerGroupCoordinator = node;
                         break;
@@ -1209,11 +1215,11 @@ public class TransactionManager {
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                 fatalError(error.exception());
             } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
-                abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
+                abortableError(new GroupAuthorizationException(builder.data().key()));
             } else {
                 fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
-                        "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
-                        findCoordinatorResponse.error().message())));
+                        "unexpected error: %s", coordinatorType, builder.data().key(),
+                        findCoordinatorResponse.data().errorMessage())));
             }
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 8109182..3e2a87a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -83,8 +85,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@@ -135,8 +135,8 @@ public enum ApiKeys {
             ControlledShutdownResponseData.SCHEMAS),
     OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
     OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
-    FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
-            FindCoordinatorResponse.schemaVersions()),
+    FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
+        FindCoordinatorResponseData.SCHEMAS),
     JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
     HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
     LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index c21fa2b..9c747ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -83,7 +83,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case OFFSET_FETCH:
                 return new OffsetFetchResponse(struct);
             case FIND_COORDINATOR:
-                return new FindCoordinatorResponse(struct);
+                return new FindCoordinatorResponse(struct, version);
             case JOIN_GROUP:
                 return new JoinGroupResponse(struct, version);
             case HEARTBEAT:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 2d44ab3..0e72843 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -18,122 +18,64 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT8;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class FindCoordinatorRequest extends AbstractRequest {
-    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
-    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
-
-    private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
-
-    private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
-            new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
-                            "for transactional producers, this is the transactional id)"),
-            new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2};
-    }
 
     public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
-        private final String coordinatorKey;
-        private final CoordinatorType coordinatorType;
-        private final short minVersion;
+        private final FindCoordinatorRequestData data;
 
-        public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
+        public Builder(FindCoordinatorRequestData data) {
             super(ApiKeys.FIND_COORDINATOR);
-            this.coordinatorType = coordinatorType;
-            this.coordinatorKey = coordinatorKey;
-            this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
+            this.data = data;
         }
 
         @Override
         public FindCoordinatorRequest build(short version) {
-            if (version < minVersion)
+            if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
                 throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
-                        "because we require features supported only in " + minVersion + " or later.");
-            return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
-        }
-
-        public String coordinatorKey() {
-            return coordinatorKey;
-        }
-
-        public CoordinatorType coordinatorType() {
-            return coordinatorType;
+                        "because we require features supported only in 2 or later.");
+            }
+            return new FindCoordinatorRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=FindCoordinatorRequest, coordinatorKey=");
-            bld.append(coordinatorKey);
-            bld.append(", coordinatorType=");
-            bld.append(coordinatorType);
-            bld.append(")");
-            return bld.toString();
+            return data.toString();
+        }
+
+        public FindCoordinatorRequestData data() {
+            return data;
         }
     }
 
-    private final String coordinatorKey;
-    private final CoordinatorType coordinatorType;
+    private final FindCoordinatorRequestData data;
 
-    private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) {
+    private FindCoordinatorRequest(FindCoordinatorRequestData data, short version) {
         super(ApiKeys.FIND_COORDINATOR, version);
-        this.coordinatorType = coordinatorType;
-        this.coordinatorKey = coordinatorKey;
+        this.data = data;
     }
 
     public FindCoordinatorRequest(Struct struct, short version) {
         super(ApiKeys.FIND_COORDINATOR, version);
-
-        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
-            this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
-        else
-            this.coordinatorType = CoordinatorType.GROUP;
-        if (struct.hasField(GROUP_ID))
-            this.coordinatorKey = struct.get(GROUP_ID);
-        else
-            this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
+        this.data = new FindCoordinatorRequestData(struct, version);
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-                return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
-            case 1:
-            case 2:
-                return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
-
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion()));
+        FindCoordinatorResponseData response = new FindCoordinatorResponseData();
+        if (version() >= 2) {
+            response.setThrottleTimeMs(throttleTimeMs);
         }
-    }
-
-    public String coordinatorKey() {
-        return coordinatorKey;
-    }
-
-    public CoordinatorType coordinatorType() {
-        return coordinatorType;
+        Errors error = Errors.forException(e);
+        return FindCoordinatorResponse.prepareResponse(error, Node.noNode());
     }
 
     public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
@@ -142,14 +84,11 @@ public class FindCoordinatorRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
-        if (struct.hasField(GROUP_ID))
-            struct.set(GROUP_ID, coordinatorKey);
-        else
-            struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
-        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
-            struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id);
-        return struct;
+        return data.toStruct(version());
+    }
+
+    public FindCoordinatorRequestData data() {
+        return data;
     }
 
     public enum CoordinatorType {
@@ -161,6 +100,10 @@ public class FindCoordinatorRequest extends AbstractRequest {
             this.id = id;
         }
 
+        public byte id() {
+            return id;
+        }
+
         public static CoordinatorType forId(byte id) {
             switch (id) {
                 case 0:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index bc7f654..c880408 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -17,53 +17,17 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class FindCoordinatorResponse extends AbstractResponse {
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
-            new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
-            new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
-            new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
-
-    private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
-                    "for a consumer group."));
-
-    private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE,
-            ERROR_MESSAGE,
-            new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2};
-    }
 
     /**
      * Possible error codes:
@@ -75,88 +39,68 @@ public class FindCoordinatorResponse extends AbstractResponse {
      * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
      */
 
+    private final FindCoordinatorResponseData data;
 
-    private final int throttleTimeMs;
-    private final String errorMessage;
-    private final Errors error;
-    private final Node node;
-
-    public FindCoordinatorResponse(Errors error, Node node) {
-        this(DEFAULT_THROTTLE_TIME, error, node);
+    public FindCoordinatorResponse(FindCoordinatorResponseData data) {
+        this.data = data;
     }
 
-    public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
-        this.node = node;
-        this.errorMessage = null;
+    public FindCoordinatorResponse(Struct struct, short version) {
+        this.data = new FindCoordinatorResponseData(struct, version);
     }
 
-    public FindCoordinatorResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        errorMessage = struct.getOrElse(ERROR_MESSAGE, null);
+    public FindCoordinatorResponseData data() {
+        return data;
+    }
 
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
+    public Node node() {
+        return new Node(data.nodeId(), data.host(), data.port());
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public boolean hasError() {
-        return this.error != Errors.NONE;
+        return error() != Errors.NONE;
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public Node node() {
-        return node;
+        return Collections.singletonMap(error(), 1);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(ERROR_CODE, error.code());
-        struct.setIfExists(ERROR_MESSAGE, errorMessage);
-
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        return struct;
+        return data.toStruct(version);
     }
 
     public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
-        return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
+        return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer), version);
     }
 
     @Override
     public String toString() {
-        return "FindCoordinatorResponse(" +
-                "throttleTimeMs=" + throttleTimeMs +
-                ", errorMessage='" + errorMessage + '\'' +
-                ", error=" + error +
-                ", node=" + node +
-                ')';
+        return data.toString();
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 2;
     }
+
+    public static FindCoordinatorResponse prepareResponse(Errors error, Node node) {
+        FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+        data.setErrorCode(error.code())
+            .setErrorMessage(error.message())
+            .setNodeId(node.id())
+            .setHost(node.host())
+            .setPort(node.port());
+        return new FindCoordinatorResponse(data);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1367b94..687dad2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -236,6 +236,10 @@ public class KafkaAdminClientTest {
         return new DeleteTopicsResponse(data);
     }
 
+    private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) {
+        return FindCoordinatorResponse.prepareResponse(error, node);
+    }
+
     /**
      * Test that the client properly times out when we don't receive any metadata.
      */
@@ -1072,7 +1076,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             DescribeGroupsResponseData data = new DescribeGroupsResponseData();
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
@@ -1139,7 +1143,7 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
             TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -1183,10 +1187,10 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
             //Retriable FindCoordinatorResponse errors should be retried
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
 
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
             final Map<String, Errors> response = new HashMap<>();
             response.put("group-0", Errors.NONE);
@@ -1198,7 +1202,7 @@ public class KafkaAdminClientTest {
             assertNull(results.get());
 
             //should throw error for non-retriable errors
-            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED,  Node.noNode()));
 
             final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
             TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8f23d41..ccd9e94 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -580,7 +580,7 @@ public class KafkaConsumerTest {
                 true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // lookup committed offset and find nothing
@@ -604,7 +604,7 @@ public class KafkaConsumerTest {
                 true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
@@ -629,7 +629,7 @@ public class KafkaConsumerTest {
                 true, groupId, groupInstanceId);
         consumer.assign(singletonList(tp0));
 
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
@@ -679,7 +679,7 @@ public class KafkaConsumerTest {
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
@@ -1116,7 +1116,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1172,7 +1172,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
         // lookup coordinator
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1226,7 +1226,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
         // lookup coordinator
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1420,7 +1420,7 @@ public class KafkaConsumerTest {
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
 
@@ -1465,7 +1465,7 @@ public class KafkaConsumerTest {
         client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator, true);
 
         // should try and find the new coordinator
-        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
 
         // rejoin group
         client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@@ -1663,7 +1663,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1691,7 +1691,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1797,7 +1797,6 @@ public class KafkaConsumerTest {
         return new ListOffsetResponse(partitionData);
     }
 
-
     private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> tpResponses = new LinkedHashMap<>();
         for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index c3f0ff5..5aaf476 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -768,7 +768,7 @@ public class AbstractCoordinatorTest {
     }
 
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new FindCoordinatorResponse(error, node);
+        return FindCoordinatorResponse.prepareResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3f9d89f..a60316f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2184,7 +2184,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new FindCoordinatorResponse(error, node);
+        return FindCoordinatorResponse.prepareResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d6b3d74..170b75e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -662,7 +662,7 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
@@ -855,7 +855,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
@@ -884,7 +884,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5e4e31e..04197d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -265,7 +265,6 @@ public class SenderTest {
      * Send multiple requests. Verify that the client side quota metrics have the right values
      */
     @Test
-    @SuppressWarnings("deprecation")
     public void testQuotaMetrics() throws Exception {
         MockSelector selector = new MockSelector(time);
         Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1811,7 +1810,6 @@ public class SenderTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1904,7 +1902,6 @@ public class SenderTest {
         testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
     }
 
-    @SuppressWarnings("deprecation")
     private void testSplitBatchAndSend(TransactionManager txnManager,
                                        ProducerIdAndEpoch producerIdAndEpoch,
                                        TopicPartition tp) throws Exception {
@@ -2473,7 +2470,8 @@ public class SenderTest {
     }
 
     private void prepareFindCoordinatorResponse(Errors error) {
-        client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
+        Node node = metadata.fetch().nodes().get(0);
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, node));
     }
 
     private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 2d3487f..ca03b76 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -687,8 +687,8 @@ public class TransactionManagerTest {
         transactionManager.initializeTransactions();
         client.prepareUnsupportedVersionResponse(body -> {
             FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
-            assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION);
-            assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId);
+            assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
+            assertEquals(findCoordinatorRequest.data().key(), transactionalId);
             return true;
         });
 
@@ -2381,10 +2381,10 @@ public class TransactionManagerTest {
                                                 final String coordinatorKey) {
         client.prepareResponse(body -> {
             FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
-            assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
-            assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey);
+            assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType);
+            assertEquals(findCoordinatorRequest.data().key(), coordinatorKey);
             return true;
-        }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
+        }, FindCoordinatorResponse.prepareResponse(error, brokerNode), shouldDisconnect);
     }
 
     private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b6a2dad..61ad3b3 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -84,6 +85,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
@@ -445,7 +447,10 @@ public class RequestResponseTest {
 
     @Test(expected = UnsupportedVersionException.class)
     public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
-        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
+        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
+                new FindCoordinatorRequestData()
+                    .setKeyType(CoordinatorType.TRANSACTION.id)
+                    .setKey("foobar"));
         builder.build((short) 0);
     }
 
@@ -697,12 +702,16 @@ public class RequestResponseTest {
     }
 
     private FindCoordinatorRequest createFindCoordinatorRequest(int version) {
-        return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+        return new FindCoordinatorRequest.Builder(
+                new FindCoordinatorRequestData()
+                    .setKeyType(CoordinatorType.GROUP.id())
+                    .setKey("test-group"))
                 .build((short) version);
     }
 
     private FindCoordinatorResponse createFindCoordinatorResponse() {
-        return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
+        Node node = new Node(10, "host1", 2014);
+        return FindCoordinatorResponse.prepareResponse(Errors.NONE, node);
     }
 
     private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List<TopicPartition> toForget) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index c78c7a4..cda5f61 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -212,7 +212,7 @@ public class WorkerCoordinatorTest {
 
         final String consumerId = "leader";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // normal join group
@@ -252,7 +252,7 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // normal join group
@@ -293,7 +293,7 @@ public class WorkerCoordinatorTest {
 
         final String memberId = "member";
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // config mismatch results in assignment error
@@ -324,7 +324,7 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // join the group once
@@ -488,11 +488,6 @@ public class WorkerCoordinatorTest {
         PowerMock.verifyAll();
     }
 
-
-    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new FindCoordinatorResponse(error, node);
-    }
-
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
                                            Map<String, Long> configOffsets, Errors error) {
         List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 17716d3..c78a451 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -28,7 +28,8 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
 import org.apache.kafka.common.internals.ClusterResourceListeners
-import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData, FindCoordinatorRequestData}
+
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -41,6 +42,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
 
 /**
   * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -108,7 +110,10 @@ class AdminClient(val time: Time,
   }
 
   def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
-    val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+    val requestBuilder = new FindCoordinatorRequest.Builder(
+        new FindCoordinatorRequestData()
+          .setKeyType(CoordinatorType.GROUP.id)
+          .setKey(groupId))
 
     def sendRequest: Try[FindCoordinatorResponse] =
       Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4e4c225..34ed7d7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,6 +57,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.PatternType.LITERAL
@@ -1171,22 +1172,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFindCoordinatorRequest(request: RequestChannel.Request) {
     val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
-    if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
-        !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL)))
+    if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
+        !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.data.key, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
-    else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
-        !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL)))
+    else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
+        !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.data.key, LITERAL)))
       sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
       // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
-        case FindCoordinatorRequest.CoordinatorType.GROUP =>
-          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+        case CoordinatorType.GROUP =>
+          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
           val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
           (partition, metadata)
 
-        case FindCoordinatorRequest.CoordinatorType.TRANSACTION =>
-          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+        case CoordinatorType.TRANSACTION =>
+          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
           val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
           (partition, metadata)
 
@@ -1195,8 +1196,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
+          new FindCoordinatorResponse(
+              new FindCoordinatorResponseData()
+                .setErrorCode(error.code)
+                .setErrorMessage(error.message)
+                .setNodeId(node.id)
+                .setHost(node.host)
+                .setPort(node.port)
+                .setThrottleTimeMs(requestThrottleMs))
+        }
         val responseBody = if (topicMetadata.error != Errors.NONE) {
-          new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
         } else {
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
@@ -1205,9 +1216,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
           coordinatorEndpoint match {
             case Some(endpoint) if !endpoint.isEmpty =>
-              new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint)
+              createFindCoordinatorResponse(Errors.NONE, endpoint)
             case _ =>
-              new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
           }
         }
         trace("Sending FindCoordinator response %s for correlation id %d to client %s."
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e7a094f..ee1c02b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -302,7 +302,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createFindCoordinatorRequest = {
-    new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
+    new FindCoordinatorRequest.Builder(
+        new FindCoordinatorRequestData()
+          .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+          .setKey(group)).build()
   }
 
   private def createUpdateMetadataRequest = {
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 385eb8f..ae6fc00 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException
+import org.apache.kafka.common.message.FindCoordinatorRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
 import org.junit.Assert._
@@ -254,7 +255,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
   }
 
   private def findCoordinator(group: String): Int = {
-    val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
+    val request = new FindCoordinatorRequest.Builder(
+        new FindCoordinatorRequestData()
+          .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+          .setKey(group)).build()
     var nodeId = -1
     TestUtils.waitUntilTrue(() => {
       val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d9c8bb5..b4c1268 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -274,7 +274,10 @@ class RequestQuotaTest extends BaseRequestTest {
           new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
 
         case ApiKeys.FIND_COORDINATOR =>
-          new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+          new FindCoordinatorRequest.Builder(
+              new FindCoordinatorRequestData()
+                .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+                .setKey("test-group"))
 
         case ApiKeys.JOIN_GROUP =>
           new JoinGroupRequest.Builder(
@@ -489,7 +492,8 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.OFFSET_COMMIT =>
         new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
       case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
-      case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
+      case ApiKeys.FIND_COORDINATOR =>
+        new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
       case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
       case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
       case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs