You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/16 20:09:15 UTC
[kafka] branch trunk updated: KAFKA-8256;
Replace Heartbeat request/response with automated protocol (#6691)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 855f899 KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691)
855f899 is described below
commit 855f899bb523f3b334f711926a7db4cc75ebb4b4
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Thu May 16 21:08:49 2019 +0100
KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691)
Reviewers: Boyang Chen <bc...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/AbstractCoordinator.java | 6 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/HeartbeatRequest.java | 82 +++++-----------------
.../kafka/common/requests/HeartbeatResponse.java | 51 ++++----------
.../kafka/clients/consumer/KafkaConsumerTest.java | 7 +-
.../internals/AbstractCoordinatorTest.java | 3 +-
.../internals/ConsumerCoordinatorTest.java | 3 +-
.../internals/ConsumerNetworkClientTest.java | 9 ++-
.../kafka/common/requests/RequestResponseTest.java | 9 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 18 +++--
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
13 files changed, 76 insertions(+), 126 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 63a7b7c..a1e15f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -891,7 +892,10 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized RequestFuture<Void> sendHeartbeatRequest() {
log.debug("Sending Heartbeat request to coordinator {}", coordinator);
HeartbeatRequest.Builder requestBuilder =
- new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
+ new HeartbeatRequest.Builder(new HeartbeatRequestData()
+ .setGroupId(groupId)
+ .setGenerationid(this.generation.generationId)
+ .setMemberId(this.generation.memberId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index a7e3757..6a16578 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
@@ -87,8 +89,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.HeartbeatRequest;
-import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
@@ -138,7 +138,7 @@ public enum ApiKeys {
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
FindCoordinatorResponseData.SCHEMAS),
JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
- HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
+ HEARTBEAT(12, "Heartbeat", HeartbeatRequestData.SCHEMAS, HeartbeatResponseData.SCHEMAS),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index a7f5a38..32402e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -87,7 +87,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case JOIN_GROUP:
return new JoinGroupResponse(struct, version);
case HEARTBEAT:
- return new HeartbeatResponse(struct);
+ return new HeartbeatResponse(struct, version);
case LEAVE_GROUP:
return new LeaveGroupResponse(struct, version);
case SYNC_GROUP:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 9a13147..f78cafe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -16,120 +16,72 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID;
public class HeartbeatRequest extends AbstractRequest {
- private static final Schema HEARTBEAT_REQUEST_V0 = new Schema(
- GROUP_ID,
- GENERATION_ID,
- MEMBER_ID);
-
- /* v1 request is the same as v0. Throttle time has been added to response */
- private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0;
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema HEARTBEAT_REQUEST_V2 = HEARTBEAT_REQUEST_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1,
- HEARTBEAT_REQUEST_V2};
- }
public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> {
- private final String groupId;
- private final int groupGenerationId;
- private final String memberId;
+ private final HeartbeatRequestData data;
- public Builder(String groupId, int groupGenerationId, String memberId) {
+ public Builder(HeartbeatRequestData data) {
super(ApiKeys.HEARTBEAT);
- this.groupId = groupId;
- this.groupGenerationId = groupGenerationId;
- this.memberId = memberId;
+ this.data = data;
}
@Override
public HeartbeatRequest build(short version) {
- return new HeartbeatRequest(groupId, groupGenerationId, memberId, version);
+ return new HeartbeatRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=HeartbeatRequest").
- append(", groupId=").append(groupId).
- append(", groupGenerationId=").append(groupGenerationId).
- append(", memberId=").append(memberId).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private final String groupId;
- private final int groupGenerationId;
- private final String memberId;
+ public final HeartbeatRequestData data;
- private HeartbeatRequest(String groupId, int groupGenerationId, String memberId, short version) {
+ private HeartbeatRequest(HeartbeatRequestData data, short version) {
super(ApiKeys.HEARTBEAT, version);
- this.groupId = groupId;
- this.groupGenerationId = groupGenerationId;
- this.memberId = memberId;
+ this.data = data;
}
public HeartbeatRequest(Struct struct, short version) {
super(ApiKeys.HEARTBEAT, version);
- groupId = struct.get(GROUP_ID);
- groupGenerationId = struct.get(GENERATION_ID);
- memberId = struct.get(MEMBER_ID);
+ this.data = new HeartbeatRequestData(struct, version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ HeartbeatResponseData response = new HeartbeatResponseData();
+ response.setErrorCode(Errors.forException(e).code());
short versionId = version();
switch (versionId) {
case 0:
- return new HeartbeatResponse(Errors.forException(e));
+ return new HeartbeatResponse(response);
case 1:
case 2:
- return new HeartbeatResponse(throttleTimeMs, Errors.forException(e));
+ response.setThrottleTimeMs(throttleTimeMs);
+ return new HeartbeatResponse(response);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion()));
}
}
- public String groupId() {
- return groupId;
- }
-
- public int groupGenerationId() {
- return groupGenerationId;
- }
-
- public String memberId() {
- return memberId;
- }
-
public static HeartbeatRequest parse(ByteBuffer buffer, short version) {
return new HeartbeatRequest(ApiKeys.HEARTBEAT.parseRequest(version, buffer), version);
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.HEARTBEAT.requestSchema(version()));
- struct.set(GROUP_ID, groupId);
- struct.set(GENERATION_ID, groupGenerationId);
- struct.set(MEMBER_ID, memberId);
- return struct;
+ return data.toStruct(version());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index efe2ed8..cc36a20 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -16,35 +16,18 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
public class HeartbeatResponse extends AbstractResponse {
- private static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(
- ERROR_CODE);
- private static final Schema HEARTBEAT_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema HEARTBEAT_RESPONSE_V2 = HEARTBEAT_RESPONSE_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1,
- HEARTBEAT_RESPONSE_V2};
- }
-
/**
* Possible error codes:
*
@@ -55,47 +38,37 @@ public class HeartbeatResponse extends AbstractResponse {
* REBALANCE_IN_PROGRESS (27)
* GROUP_AUTHORIZATION_FAILED (30)
*/
- private final Errors error;
- private final int throttleTimeMs;
-
- public HeartbeatResponse(Errors error) {
- this(DEFAULT_THROTTLE_TIME, error);
- }
+ private final HeartbeatResponseData data;
- public HeartbeatResponse(int throttleTimeMs, Errors error) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
+ public HeartbeatResponse(HeartbeatResponseData data) {
+ this.data = data;
}
- public HeartbeatResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- error = Errors.forCode(struct.get(ERROR_CODE));
+ public HeartbeatResponse(Struct struct, short version) {
+ this.data = new HeartbeatResponseData(struct, version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
+ return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
- struct.set(ERROR_CODE, error.code());
- return struct;
+ return data.toStruct(version);
}
public static HeartbeatResponse parse(ByteBuffer buffer, short version) {
- return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer));
+ return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer), version);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 09eb94a..606b711 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
@@ -1440,7 +1441,9 @@ public class KafkaConsumerTest {
public boolean matches(AbstractRequest body) {
return true;
}
- }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator);
+ }, new HeartbeatResponse(
+ new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())),
+ coordinator);
// join group
final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
@@ -1713,7 +1716,7 @@ public class KafkaConsumerTest {
heartbeatReceived.set(true);
return true;
}
- }, new HeartbeatResponse(Errors.NONE), coordinator);
+ }, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())), coordinator);
return heartbeatReceived;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 449c58f..c8be163 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -773,7 +774,7 @@ public class AbstractCoordinatorTest {
}
private HeartbeatResponse heartbeatResponse(Errors error) {
- return new HeartbeatResponse(error);
+ return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
}
private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index d8b1252..958e440 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
@@ -2189,7 +2190,7 @@ public class ConsumerCoordinatorTest {
}
private HeartbeatResponse heartbeatResponse(Errors error) {
- return new HeartbeatResponse(error);
+ return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
}
private JoinGroupResponse joinGroupLeaderResponse(int generationId,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1b7f8fb..f3750aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -378,11 +380,14 @@ public class ConsumerNetworkClientTest {
}
private HeartbeatRequest.Builder heartbeat() {
- return new HeartbeatRequest.Builder("group", 1, "memberId");
+ return new HeartbeatRequest.Builder(new HeartbeatRequestData()
+ .setGroupId("group")
+ .setGenerationid(1)
+ .setMemberId("memberId"));
}
private HeartbeatResponse heartbeatResponse(Errors error) {
- return new HeartbeatResponse(error);
+ return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 43057a6..d9a9ed1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -772,11 +774,14 @@ public class RequestResponseTest {
}
private HeartbeatRequest createHeartBeatRequest() {
- return new HeartbeatRequest.Builder("group1", 1, "consumer1").build();
+ return new HeartbeatRequest.Builder(new HeartbeatRequestData()
+ .setGroupId("group1")
+ .setGenerationid(1)
+ .setMemberId("consumer1")).build();
}
private HeartbeatResponse createHeartBeatResponse() {
- return new HeartbeatResponse(Errors.NONE);
+ return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code()));
}
private JoinGroupRequest createJoinGroupRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6bd7174..ef921bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1424,7 +1424,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a heartbeat response
def sendResponseCallback(error: Errors) {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val response = new HeartbeatResponse(requestThrottleMs, error)
+ val response = new HeartbeatResponse(
+ new HeartbeatResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(error.code))
trace("Sending heartbeat response %s for correlation id %d to client %s."
.format(response, request.header.correlationId, request.header.clientId))
response
@@ -1432,15 +1435,18 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, createResponse)
}
- if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.groupId, LITERAL))) {
+ if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.data.groupId, LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
+ new HeartbeatResponse(
+ new HeartbeatResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)))
} else {
// let the coordinator to handle heartbeat
groupCoordinator.handleHeartbeat(
- heartbeatRequest.groupId,
- heartbeatRequest.memberId,
- heartbeatRequest.groupGenerationId,
+ heartbeatRequest.data.groupId,
+ heartbeatRequest.data.memberId,
+ heartbeatRequest.data.generationid,
sendResponseCallback)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index aa6a0dd..fa5e91a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -380,7 +380,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
).build()
}
- private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build()
+ private def heartbeatRequest = new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId(group).setGenerationid(1).setMemberId("")).build()
private def leaveGroupRequest = new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId(group).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build()
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index eadcffb..782f873 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -300,7 +300,7 @@ class RequestQuotaTest extends BaseRequestTest {
)
case ApiKeys.HEARTBEAT =>
- new HeartbeatRequest.Builder("test-group", 1, "")
+ new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId("test-group").setGenerationid(1).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
case ApiKeys.LEAVE_GROUP =>
new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("test-leave-group").setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID))
@@ -503,7 +503,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
- case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
+ case ApiKeys.HEARTBEAT => new HeartbeatResponse(response, ApiKeys.HEARTBEAT.latestVersion).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_GROUPS =>