You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/06 21:26:45 UTC
[kafka] branch trunk updated: KAFKA-8056;
Use automatic RPC generation for FindCoordinator (#6408)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 407bcdf KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)
407bcdf is described below
commit 407bcdf78e06f83f2b358d2cbd96aed348a5c28f
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Mon May 6 22:26:22 2019 +0100
KAFKA-8056; Use automatic RPC generation for FindCoordinator (#6408)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 23 +++-
.../consumer/internals/AbstractCoordinator.java | 15 ++-
.../producer/internals/TransactionManager.java | 16 ++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../common/requests/FindCoordinatorRequest.java | 121 ++++++---------------
.../common/requests/FindCoordinatorResponse.java | 114 +++++--------------
.../kafka/clients/admin/KafkaAdminClientTest.java | 16 ++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 23 ++--
.../internals/AbstractCoordinatorTest.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 6 +-
.../clients/producer/internals/SenderTest.java | 6 +-
.../producer/internals/TransactionManagerTest.java | 10 +-
.../kafka/common/requests/RequestResponseTest.java | 15 ++-
.../runtime/distributed/WorkerCoordinatorTest.java | 13 +--
core/src/main/scala/kafka/admin/AdminClient.scala | 9 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 35 ++++--
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../integration/kafka/api/ConsumerBounceTest.scala | 6 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +-
21 files changed, 198 insertions(+), 257 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ffe24ca..a0958e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -68,6 +68,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
@@ -103,6 +104,7 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -2538,8 +2540,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+ FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+ return new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id())
+ .setKey(groupId));
}
@Override
@@ -2781,8 +2786,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+ FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+ return new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id())
+ .setKey(groupId));
}
@Override
@@ -2872,8 +2880,11 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId);
+ FindCoordinatorRequest.Builder createRequest(int timeoutMs) {
+ return new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id())
+ .setKey(groupId));
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 0124338..2cf3910 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -661,7 +663,10 @@ public abstract class AbstractCoordinator implements Closeable {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
- new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id())
+ .setKey(this.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
@@ -679,12 +684,12 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized (AbstractCoordinator.this) {
// use MAX_VALUE - node.id as the coordinator id to allow separate connections
// for the coordinator in the underlying network client layer
- int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
+ int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
- findCoordinatorResponse.node().host(),
- findCoordinatorResponse.node().port());
+ findCoordinatorResponse.data().host(),
+ findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
client.tryConnect(coordinator);
heartbeat.resetSessionTimeout();
@@ -693,7 +698,7 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
- log.debug("Group coordinator lookup failed: {}", error.message());
+ log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 98fa153..e24d69b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
@@ -38,6 +39,7 @@ import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
@@ -866,7 +868,10 @@ public class TransactionManager {
throw new IllegalStateException("Invalid coordinator type: " + type);
}
- FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
+ FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(type.id())
+ .setKey(coordinatorKey));
enqueueRequest(new FindCoordinatorHandler(builder));
}
@@ -1193,10 +1198,11 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
Errors error = findCoordinatorResponse.error();
+ CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
if (error == Errors.NONE) {
Node node = findCoordinatorResponse.node();
- switch (builder.coordinatorType()) {
+ switch (coordinatorType) {
case GROUP:
consumerGroupCoordinator = node;
break;
@@ -1209,11 +1215,11 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
- abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
+ abortableError(new GroupAuthorizationException(builder.data().key()));
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
- "unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
- findCoordinatorResponse.error().message())));
+ "unexpected error: %s", coordinatorType, builder.data().key(),
+ findCoordinatorResponse.data().errorMessage())));
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 8109182..3e2a87a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -83,8 +85,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@@ -135,8 +135,8 @@ public enum ApiKeys {
ControlledShutdownResponseData.SCHEMAS),
OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()),
- FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(),
- FindCoordinatorResponse.schemaVersions()),
+ FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
+ FindCoordinatorResponseData.SCHEMAS),
JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index c21fa2b..9c747ec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -83,7 +83,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case OFFSET_FETCH:
return new OffsetFetchResponse(struct);
case FIND_COORDINATOR:
- return new FindCoordinatorResponse(struct);
+ return new FindCoordinatorResponse(struct, version);
case JOIN_GROUP:
return new JoinGroupResponse(struct, version);
case HEARTBEAT:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index 2d44ab3..0e72843 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -18,122 +18,64 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT8;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
public class FindCoordinatorRequest extends AbstractRequest {
- private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
- private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
-
- private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID);
-
- private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
- new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " +
- "for transactional producers, this is the transactional id)"),
- new Field("coordinator_type", INT8, "The type of coordinator to find (0 = group, 1 = transaction)"));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema FIND_COORDINATOR_REQUEST_V2 = FIND_COORDINATOR_REQUEST_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1, FIND_COORDINATOR_REQUEST_V2};
- }
public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
- private final String coordinatorKey;
- private final CoordinatorType coordinatorType;
- private final short minVersion;
+ private final FindCoordinatorRequestData data;
- public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
+ public Builder(FindCoordinatorRequestData data) {
super(ApiKeys.FIND_COORDINATOR);
- this.coordinatorType = coordinatorType;
- this.coordinatorKey = coordinatorKey;
- this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
+ this.data = data;
}
@Override
public FindCoordinatorRequest build(short version) {
- if (version < minVersion)
+ if (version < 1 && data.keyType() == CoordinatorType.TRANSACTION.id()) {
throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
- "because we require features supported only in " + minVersion + " or later.");
- return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
- }
-
- public String coordinatorKey() {
- return coordinatorKey;
- }
-
- public CoordinatorType coordinatorType() {
- return coordinatorType;
+ "because we require features supported only in 2 or later.");
+ }
+ return new FindCoordinatorRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=FindCoordinatorRequest, coordinatorKey=");
- bld.append(coordinatorKey);
- bld.append(", coordinatorType=");
- bld.append(coordinatorType);
- bld.append(")");
- return bld.toString();
+ return data.toString();
+ }
+
+ public FindCoordinatorRequestData data() {
+ return data;
}
}
- private final String coordinatorKey;
- private final CoordinatorType coordinatorType;
+ private final FindCoordinatorRequestData data;
- private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) {
+ private FindCoordinatorRequest(FindCoordinatorRequestData data, short version) {
super(ApiKeys.FIND_COORDINATOR, version);
- this.coordinatorType = coordinatorType;
- this.coordinatorKey = coordinatorKey;
+ this.data = data;
}
public FindCoordinatorRequest(Struct struct, short version) {
super(ApiKeys.FIND_COORDINATOR, version);
-
- if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
- this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
- else
- this.coordinatorType = CoordinatorType.GROUP;
- if (struct.hasField(GROUP_ID))
- this.coordinatorKey = struct.get(GROUP_ID);
- else
- this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
+ this.data = new FindCoordinatorRequestData(struct, version);
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- short versionId = version();
- switch (versionId) {
- case 0:
- return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
- case 1:
- case 2:
- return new FindCoordinatorResponse(throttleTimeMs, Errors.forException(e), Node.noNode());
-
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion()));
+ FindCoordinatorResponseData response = new FindCoordinatorResponseData();
+ if (version() >= 2) {
+ response.setThrottleTimeMs(throttleTimeMs);
}
- }
-
- public String coordinatorKey() {
- return coordinatorKey;
- }
-
- public CoordinatorType coordinatorType() {
- return coordinatorType;
+ Errors error = Errors.forException(e);
+ return FindCoordinatorResponse.prepareResponse(error, Node.noNode());
}
public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
@@ -142,14 +84,11 @@ public class FindCoordinatorRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
- if (struct.hasField(GROUP_ID))
- struct.set(GROUP_ID, coordinatorKey);
- else
- struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
- if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
- struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id);
- return struct;
+ return data.toStruct(version());
+ }
+
+ public FindCoordinatorRequestData data() {
+ return data;
}
public enum CoordinatorType {
@@ -161,6 +100,10 @@ public class FindCoordinatorRequest extends AbstractRequest {
this.id = id;
}
+ public byte id() {
+ return id;
+ }
+
public static CoordinatorType forId(byte id) {
switch (id) {
case 0:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index bc7f654..c880408 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -17,53 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
public class FindCoordinatorResponse extends AbstractResponse {
- private static final String COORDINATOR_KEY_NAME = "coordinator";
-
- // coordinator level field names
- private static final String NODE_ID_KEY_NAME = "node_id";
- private static final String HOST_KEY_NAME = "host";
- private static final String PORT_KEY_NAME = "port";
-
- private static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
- new Field(NODE_ID_KEY_NAME, INT32, "The broker id."),
- new Field(HOST_KEY_NAME, STRING, "The hostname of the broker."),
- new Field(PORT_KEY_NAME, INT32, "The port on which the broker accepts requests."));
-
- private static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator " +
- "for a consumer group."));
-
- private static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- ERROR_CODE,
- ERROR_MESSAGE,
- new Field(COORDINATOR_KEY_NAME, FIND_COORDINATOR_BROKER_V0, "Host and port information for the coordinator"));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema FIND_COORDINATOR_RESPONSE_V2 = FIND_COORDINATOR_RESPONSE_V1;
-
- public static Schema[] schemaVersions() {
- return new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1, FIND_COORDINATOR_RESPONSE_V2};
- }
/**
* Possible error codes:
@@ -75,88 +39,68 @@ public class FindCoordinatorResponse extends AbstractResponse {
* TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
*/
+ private final FindCoordinatorResponseData data;
- private final int throttleTimeMs;
- private final String errorMessage;
- private final Errors error;
- private final Node node;
-
- public FindCoordinatorResponse(Errors error, Node node) {
- this(DEFAULT_THROTTLE_TIME, error, node);
+ public FindCoordinatorResponse(FindCoordinatorResponseData data) {
+ this.data = data;
}
- public FindCoordinatorResponse(int throttleTimeMs, Errors error, Node node) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.node = node;
- this.errorMessage = null;
+ public FindCoordinatorResponse(Struct struct, short version) {
+ this.data = new FindCoordinatorResponseData(struct, version);
}
- public FindCoordinatorResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- error = Errors.forCode(struct.get(ERROR_CODE));
- errorMessage = struct.getOrElse(ERROR_MESSAGE, null);
+ public FindCoordinatorResponseData data() {
+ return data;
+ }
- Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
- int nodeId = broker.getInt(NODE_ID_KEY_NAME);
- String host = broker.getString(HOST_KEY_NAME);
- int port = broker.getInt(PORT_KEY_NAME);
- node = new Node(nodeId, host, port);
+ public Node node() {
+ return new Node(data.nodeId(), data.host(), data.port());
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
public boolean hasError() {
- return this.error != Errors.NONE;
+ return error() != Errors.NONE;
}
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
- }
-
- public Node node() {
- return node;
+ return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
- struct.set(ERROR_CODE, error.code());
- struct.setIfExists(ERROR_MESSAGE, errorMessage);
-
- Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
- coordinator.set(NODE_ID_KEY_NAME, node.id());
- coordinator.set(HOST_KEY_NAME, node.host());
- coordinator.set(PORT_KEY_NAME, node.port());
- struct.set(COORDINATOR_KEY_NAME, coordinator);
- return struct;
+ return data.toStruct(version);
}
public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
- return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
+ return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer), version);
}
@Override
public String toString() {
- return "FindCoordinatorResponse(" +
- "throttleTimeMs=" + throttleTimeMs +
- ", errorMessage='" + errorMessage + '\'' +
- ", error=" + error +
- ", node=" + node +
- ')';
+ return data.toString();
}
@Override
public boolean shouldClientThrottle(short version) {
return version >= 2;
}
+
+ public static FindCoordinatorResponse prepareResponse(Errors error, Node node) {
+ FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+ data.setErrorCode(error.code())
+ .setErrorMessage(error.message())
+ .setNodeId(node.id())
+ .setHost(node.host())
+ .setPort(node.port());
+ return new FindCoordinatorResponse(data);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1367b94..687dad2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -236,6 +236,10 @@ public class KafkaAdminClientTest {
return new DeleteTopicsResponse(data);
}
+ private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) {
+ return FindCoordinatorResponse.prepareResponse(error, node);
+ }
+
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@@ -1072,7 +1076,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
@@ -1139,7 +1143,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@@ -1183,10 +1187,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
//Retriable FindCoordinatorResponse errors should be retried
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>();
response.put("group-0", Errors.NONE);
@@ -1198,7 +1202,7 @@ public class KafkaAdminClientTest {
assertNull(results.get());
//should throw error for non-retriable errors
- env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
+ env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));
final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8f23d41..ccd9e94 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -580,7 +580,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// lookup committed offset and find nothing
@@ -604,7 +604,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator);
@@ -629,7 +629,7 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId);
consumer.assign(singletonList(tp0));
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator);
@@ -679,7 +679,7 @@ public class KafkaConsumerTest {
consumer.assign(singletonList(tp0));
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// fetch offset for one topic
@@ -1116,7 +1116,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@@ -1172,7 +1172,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@@ -1226,7 +1226,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@@ -1420,7 +1420,7 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -1465,7 +1465,7 @@ public class KafkaConsumerTest {
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator, true);
// should try and find the new coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
// rejoin group
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@@ -1663,7 +1663,7 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
@@ -1691,7 +1691,7 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
- client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
@@ -1797,7 +1797,6 @@ public class KafkaConsumerTest {
return new ListOffsetResponse(partitionData);
}
-
private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> tpResponses = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index c3f0ff5..5aaf476 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -768,7 +768,7 @@ public class AbstractCoordinatorTest {
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
- return new FindCoordinatorResponse(error, node);
+ return FindCoordinatorResponse.prepareResponse(error, node);
}
private HeartbeatResponse heartbeatResponse(Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3f9d89f..a60316f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2184,7 +2184,7 @@ public class ConsumerCoordinatorTest {
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
- return new FindCoordinatorResponse(error, node);
+ return FindCoordinatorResponse.prepareResponse(error, node);
}
private HeartbeatResponse heartbeatResponse(Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index d6b3d74..170b75e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -662,7 +662,7 @@ public class KafkaProducerTest {
Node node = metadata.fetch().nodes().get(0);
client.throttle(node, 5000);
- client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
@@ -855,7 +855,7 @@ public class KafkaProducerTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch assertionDoneLatch = new CountDownLatch(1);
- client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
executorService.submit(() -> {
assertThrows(KafkaException.class, producer::initTransactions);
assertionDoneLatch.countDown();
@@ -884,7 +884,7 @@ public class KafkaProducerTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch assertionDoneLatch = new CountDownLatch(1);
- client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
executorService.submit(() -> {
assertThrows(KafkaException.class, producer::initTransactions);
assertionDoneLatch.countDown();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5e4e31e..04197d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -265,7 +265,6 @@ public class SenderTest {
* Send multiple requests. Verify that the client side quota metrics have the right values
*/
@Test
- @SuppressWarnings("deprecation")
public void testQuotaMetrics() throws Exception {
MockSelector selector = new MockSelector(time);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
@@ -1811,7 +1810,6 @@ public class SenderTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
@@ -1904,7 +1902,6 @@ public class SenderTest {
testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
}
- @SuppressWarnings("deprecation")
private void testSplitBatchAndSend(TransactionManager txnManager,
ProducerIdAndEpoch producerIdAndEpoch,
TopicPartition tp) throws Exception {
@@ -2473,7 +2470,8 @@ public class SenderTest {
}
private void prepareFindCoordinatorResponse(Errors error) {
- client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
+ Node node = metadata.fetch().nodes().get(0);
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, node));
}
private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 2d3487f..ca03b76 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -687,8 +687,8 @@ public class TransactionManagerTest {
transactionManager.initializeTransactions();
client.prepareUnsupportedVersionResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
- assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION);
- assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId);
+ assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
+ assertEquals(findCoordinatorRequest.data().key(), transactionalId);
return true;
});
@@ -2381,10 +2381,10 @@ public class TransactionManagerTest {
final String coordinatorKey) {
client.prepareResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
- assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
- assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey);
+ assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), coordinatorType);
+ assertEquals(findCoordinatorRequest.data().key(), coordinatorKey);
return true;
- }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
+ }, FindCoordinatorResponse.prepareResponse(error, brokerNode), shouldDisconnect);
}
private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b6a2dad..61ad3b3 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -84,6 +85,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
@@ -445,7 +447,10 @@ public class RequestResponseTest {
@Test(expected = UnsupportedVersionException.class)
public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
- FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
+ FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.TRANSACTION.id)
+ .setKey("foobar"));
builder.build((short) 0);
}
@@ -697,12 +702,16 @@ public class RequestResponseTest {
}
private FindCoordinatorRequest createFindCoordinatorRequest(int version) {
- return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+ return new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id())
+ .setKey("test-group"))
.build((short) version);
}
private FindCoordinatorResponse createFindCoordinatorResponse() {
- return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
+ Node node = new Node(10, "host1", 2014);
+ return FindCoordinatorResponse.prepareResponse(Errors.NONE, node);
}
private FetchRequest createFetchRequest(int version, FetchMetadata metadata, List<TopicPartition> toForget) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index c78c7a4..cda5f61 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -212,7 +212,7 @@ public class WorkerCoordinatorTest {
final String consumerId = "leader";
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
@@ -252,7 +252,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
@@ -293,7 +293,7 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// config mismatch results in assignment error
@@ -324,7 +324,7 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
- client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// join the group once
@@ -488,11 +488,6 @@ public class WorkerCoordinatorTest {
PowerMock.verifyAll();
}
-
- private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
- return new FindCoordinatorResponse(error, node);
- }
-
private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
Map<String, Long> configOffsets, Errors error) {
List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 17716d3..c78a451 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -28,7 +28,8 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
import org.apache.kafka.common.internals.ClusterResourceListeners
-import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, DescribeGroupsResponseData, FindCoordinatorRequestData}
+
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -41,6 +42,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
/**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@@ -108,7 +110,10 @@ class AdminClient(val time: Time,
}
def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
- val requestBuilder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+ val requestBuilder = new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.GROUP.id)
+ .setKey(groupId))
def sendRequest: Try[FindCoordinatorResponse] =
Try(sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse])
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4e4c225..34ed7d7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -57,6 +57,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
@@ -1171,22 +1172,22 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFindCoordinatorRequest(request: RequestChannel.Request) {
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
- if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
- !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.coordinatorKey, LITERAL)))
+ if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
+ !authorize(request.session, Describe, Resource(Group, findCoordinatorRequest.data.key, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
- else if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION &&
- !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.coordinatorKey, LITERAL)))
+ else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
+ !authorize(request.session, Describe, Resource(TransactionalId, findCoordinatorRequest.data.key, LITERAL)))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else {
// get metadata (and create the topic if necessary)
- val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
- case FindCoordinatorRequest.CoordinatorType.GROUP =>
- val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+ val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+ case CoordinatorType.GROUP =>
+ val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)
(partition, metadata)
- case FindCoordinatorRequest.CoordinatorType.TRANSACTION =>
- val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+ case CoordinatorType.TRANSACTION =>
+ val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)
(partition, metadata)
@@ -1195,8 +1196,18 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def createResponse(requestThrottleMs: Int): AbstractResponse = {
+ def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = {
+ new FindCoordinatorResponse(
+ new FindCoordinatorResponseData()
+ .setErrorCode(error.code)
+ .setErrorMessage(error.message)
+ .setNodeId(node.id)
+ .setHost(node.host)
+ .setPort(node.port)
+ .setThrottleTimeMs(requestThrottleMs))
+ }
val responseBody = if (topicMetadata.error != Errors.NONE) {
- new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
} else {
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
@@ -1205,9 +1216,9 @@ class KafkaApis(val requestChannel: RequestChannel,
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
- new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint)
+ createFindCoordinatorResponse(Errors.NONE, endpoint)
case _ =>
- new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
}
}
trace("Sending FindCoordinator response %s for correlation id %d to client %s."
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e7a094f..ee1c02b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -302,7 +302,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createFindCoordinatorRequest = {
- new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+ .setKey(group)).build()
}
private def createUpdateMetadataRequest = {
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 385eb8f..ae6fc00 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.GroupMaxSizeReachedException
+import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
import org.junit.Assert._
@@ -254,7 +255,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}
private def findCoordinator(group: String): Int = {
- val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
+ val request = new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+ .setKey(group)).build()
var nodeId = -1
TestUtils.waitUntilTrue(() => {
val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d9c8bb5..b4c1268 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -274,7 +274,10 @@ class RequestQuotaTest extends BaseRequestTest {
new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
case ApiKeys.FIND_COORDINATOR =>
- new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+ .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id)
+ .setKey("test-group"))
case ApiKeys.JOIN_GROUP =>
new JoinGroupRequest.Builder(
@@ -489,7 +492,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitResponse(response, ApiKeys.OFFSET_COMMIT.latestVersion).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
- case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
+ case ApiKeys.FIND_COORDINATOR =>
+ new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs
case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs
case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs