You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/16 20:09:15 UTC

[kafka] branch trunk updated: KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 855f899  KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691)
855f899 is described below

commit 855f899bb523f3b334f711926a7db4cc75ebb4b4
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Thu May 16 21:08:49 2019 +0100

    KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691)
    
    Reviewers: Boyang Chen <bc...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  6 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  6 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../kafka/common/requests/HeartbeatRequest.java    | 82 +++++-----------------
 .../kafka/common/requests/HeartbeatResponse.java   | 51 ++++----------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  7 +-
 .../internals/AbstractCoordinatorTest.java         |  3 +-
 .../internals/ConsumerCoordinatorTest.java         |  3 +-
 .../internals/ConsumerNetworkClientTest.java       |  9 ++-
 .../kafka/common/requests/RequestResponseTest.java |  9 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 18 +++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +-
 13 files changed, 76 insertions(+), 126 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 63a7b7c..a1e15f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -891,7 +892,10 @@ public abstract class AbstractCoordinator implements Closeable {
     synchronized RequestFuture<Void> sendHeartbeatRequest() {
         log.debug("Sending Heartbeat request to coordinator {}", coordinator);
         HeartbeatRequest.Builder requestBuilder =
-                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
+                new HeartbeatRequest.Builder(new HeartbeatRequestData()
+                        .setGroupId(groupId)
+                        .setGenerationid(this.generation.generationId)
+                        .setMemberId(this.generation.memberId));
         return client.send(coordinator, requestBuilder)
                 .compose(new HeartbeatResponseHandler());
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index a7e3757..6a16578 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -87,8 +89,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.HeartbeatRequest;
-import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.requests.LeaderAndIsrResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -138,7 +138,7 @@ public enum ApiKeys {
     FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
         FindCoordinatorResponseData.SCHEMAS),
     JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
-    HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
+    HEARTBEAT(12, "Heartbeat", HeartbeatRequestData.SCHEMAS, HeartbeatResponseData.SCHEMAS),
     LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
     SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
     DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index a7f5a38..32402e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -87,7 +87,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
             case JOIN_GROUP:
                 return new JoinGroupResponse(struct, version);
             case HEARTBEAT:
-                return new HeartbeatResponse(struct);
+                return new HeartbeatResponse(struct, version);
             case LEAVE_GROUP:
                 return new LeaveGroupResponse(struct, version);
             case SYNC_GROUP:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 9a13147..f78cafe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -16,120 +16,72 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
 
 public class HeartbeatRequest extends AbstractRequest {
-    private static final Schema HEARTBEAT_REQUEST_V0 = new Schema(
-            GROUP_ID,
-            GENERATION_ID,
-            MEMBER_ID);
-
-    /* v1 request is the same as v0. Throttle time has been added to response */
-    private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema HEARTBEAT_REQUEST_V2 = HEARTBEAT_REQUEST_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1,
-            HEARTBEAT_REQUEST_V2};
-    }
 
     public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> {
-        private final String groupId;
-        private final int groupGenerationId;
-        private final String memberId;
+        private final HeartbeatRequestData data;
 
-        public Builder(String groupId, int groupGenerationId, String memberId) {
+        public Builder(HeartbeatRequestData data) {
             super(ApiKeys.HEARTBEAT);
-            this.groupId = groupId;
-            this.groupGenerationId = groupGenerationId;
-            this.memberId = memberId;
+            this.data = data;
         }
 
         @Override
         public HeartbeatRequest build(short version) {
-            return new HeartbeatRequest(groupId, groupGenerationId, memberId, version);
+            return new HeartbeatRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=HeartbeatRequest").
-                append(", groupId=").append(groupId).
-                append(", groupGenerationId=").append(groupGenerationId).
-                append(", memberId=").append(memberId).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String groupId;
-    private final int groupGenerationId;
-    private final String memberId;
+    public final HeartbeatRequestData data;
 
-    private HeartbeatRequest(String groupId, int groupGenerationId, String memberId, short version) {
+    private HeartbeatRequest(HeartbeatRequestData data, short version) {
         super(ApiKeys.HEARTBEAT, version);
-        this.groupId = groupId;
-        this.groupGenerationId = groupGenerationId;
-        this.memberId = memberId;
+        this.data = data;
     }
 
     public HeartbeatRequest(Struct struct, short version) {
         super(ApiKeys.HEARTBEAT, version);
-        groupId = struct.get(GROUP_ID);
-        groupGenerationId = struct.get(GENERATION_ID);
-        memberId = struct.get(MEMBER_ID);
+        this.data = new HeartbeatRequestData(struct, version);
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        HeartbeatResponseData response = new HeartbeatResponseData();
+        response.setErrorCode(Errors.forException(e).code());
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new HeartbeatResponse(Errors.forException(e));
+                return new HeartbeatResponse(response);
             case 1:
             case 2:
-                return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
+                response.setThrottleTimeMs(throttleTimeMs);
+                return new HeartbeatResponse(response);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion()));
         }
     }
 
-    public String groupId() {
-        return groupId;
-    }
-
-    public int groupGenerationId() {
-        return groupGenerationId;
-    }
-
-    public String memberId() {
-        return memberId;
-    }
-
     public static HeartbeatRequest parse(ByteBuffer buffer, short version) {
         return new HeartbeatRequest(ApiKeys.HEARTBEAT.parseRequest(version, buffer), version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.HEARTBEAT.requestSchema(version()));
-        struct.set(GROUP_ID, groupId);
-        struct.set(GENERATION_ID, groupGenerationId);
-        struct.set(MEMBER_ID, memberId);
-        return struct;
+        return data.toStruct(version());
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index efe2ed8..cc36a20 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -16,35 +16,18 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
 public class HeartbeatResponse extends AbstractResponse {
 
-    private static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(
-            ERROR_CODE);
-    private static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
-            THROTTLE_TIME_MS,
-            ERROR_CODE);
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema HEARTBEAT_RESPONSE_V2 = HEARTBEAT_RESPONSE_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1,
-            HEARTBEAT_RESPONSE_V2};
-    }
-
     /**
      * Possible error codes:
      *
@@ -55,47 +38,37 @@ public class HeartbeatResponse extends AbstractResponse {
      * REBALANCE_IN_PROGRESS (27)
      * GROUP_AUTHORIZATION_FAILED (30)
      */
-    private final Errors error;
-    private final int throttleTimeMs;
-
-    public HeartbeatResponse(Errors error) {
-        this(DEFAULT_THROTTLE_TIME, error);
-    }
+    private final HeartbeatResponseData data;
 
-    public HeartbeatResponse(int throttleTimeMs, Errors error) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.error = error;
+    public HeartbeatResponse(HeartbeatResponseData data) {
+        this.data = data;
     }
 
-    public HeartbeatResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
-        error = Errors.forCode(struct.get(ERROR_CODE));
+    public HeartbeatResponse(Struct struct, short version) {
+        this.data = new HeartbeatResponseData(struct, version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+        return Collections.singletonMap(error(), 1);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
-        struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
-        struct.set(ERROR_CODE, error.code());
-        return struct;
+        return data.toStruct(version);
     }
 
     public static HeartbeatResponse parse(ByteBuffer buffer, short version) {
-        return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer));
+        return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer), version);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 09eb94a..606b711 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
@@ -1440,7 +1441,9 @@ public class KafkaConsumerTest {
             public boolean matches(AbstractRequest body) {
                 return true;
             }
-        }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator);
+        }, new HeartbeatResponse(
+                new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())),
+                coordinator);
 
         // join group
         final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
@@ -1713,7 +1716,7 @@ public class KafkaConsumerTest {
                 heartbeatReceived.set(true);
                 return true;
             }
-        }, new HeartbeatResponse(Errors.NONE), coordinator);
+        }, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())), coordinator);
         return heartbeatReceived;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 449c58f..c8be163 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -773,7 +774,7 @@ public class AbstractCoordinatorTest {
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
-        return new HeartbeatResponse(error);
+        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index d8b1252..958e440 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
@@ -2189,7 +2190,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
-        return new HeartbeatResponse(error);
+        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1b7f8fb..f3750aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -378,11 +380,14 @@ public class ConsumerNetworkClientTest {
     }
 
     private HeartbeatRequest.Builder heartbeat() {
-        return new HeartbeatRequest.Builder("group", 1, "memberId");
+        return new HeartbeatRequest.Builder(new HeartbeatRequestData()
+                .setGroupId("group")
+                .setGenerationid(1)
+                .setMemberId("memberId"));
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
-        return new HeartbeatResponse(error);
+        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 43057a6..d9a9ed1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.InitProducerIdRequestData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -772,11 +774,14 @@ public class RequestResponseTest {
     }
 
     private HeartbeatRequest createHeartBeatRequest() {
-        return new HeartbeatRequest.Builder("group1", 1, "consumer1").build();
+        return new HeartbeatRequest.Builder(new HeartbeatRequestData()
+                .setGroupId("group1")
+                .setGenerationid(1)
+                .setMemberId("consumer1")).build();
     }
 
     private HeartbeatResponse createHeartBeatResponse() {
-        return new HeartbeatResponse(Errors.NONE);
+        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code()));
     }
 
     private JoinGroupRequest createJoinGroupRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6bd7174..ef921bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1424,7 +1424,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a heartbeat response
     def sendResponseCallback(error: Errors) {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(requestThrottleMs, error)
+        val response = new HeartbeatResponse(
+            new HeartbeatResponseData()
+              .setThrottleTimeMs(requestThrottleMs)
+              .setErrorCode(error.code))
         trace("Sending heartbeat response %s for correlation id %d to client %s."
           .format(response, request.header.correlationId, request.header.clientId))
         response
@@ -1432,15 +1435,18 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(request, createResponse)
     }
 
-    if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.groupId, LITERAL))) {
+    if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.data.groupId, LITERAL))) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+        new HeartbeatResponse(
+            new HeartbeatResponseData()
+              .setThrottleTimeMs(requestThrottleMs)
+              .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)))
     } else {
       // let the coordinator to handle heartbeat
       groupCoordinator.handleHeartbeat(
-        heartbeatRequest.groupId,
-        heartbeatRequest.memberId,
-        heartbeatRequest.groupGenerationId,
+        heartbeatRequest.data.groupId,
+        heartbeatRequest.data.memberId,
+        heartbeatRequest.data.generationid,
         sendResponseCallback)
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index aa6a0dd..fa5e91a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -380,7 +380,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ).build()
   }
 
-  private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build()
+  private def heartbeatRequest = new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId(group).setGenerationid(1).setMemberId("")).build()
 
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId(group).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index eadcffb..782f873 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -300,7 +300,7 @@ class RequestQuotaTest extends BaseRequestTest {
           )
 
         case ApiKeys.HEARTBEAT =>
-          new HeartbeatRequest.Builder("test-group", 1, "")
+          new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId("test-group").setGenerationid(1).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
 
         case ApiKeys.LEAVE_GROUP =>
           new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("test-leave-group").setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
@@ -503,7 +503,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.FIND_COORDINATOR =>
         new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
       case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
-      case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
+      case ApiKeys.HEARTBEAT => new HeartbeatResponse(response, ApiKeys.HEARTBEAT.latestVersion).throttleTimeMs
       case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
       case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
       case ApiKeys.DESCRIBE_GROUPS =>