You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/29 21:32:47 UTC
[kafka] branch trunk updated: KAFKA-8034: Use automatic RPC
generation in DeleteTopics
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 981815c KAFKA-8034: Use automatic RPC generation in DeleteTopics
981815c is described below
commit 981815c8d14daf1042e06f8fa9cb355187719b1d
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri Mar 29 21:32:36 2019 +0000
KAFKA-8034: Use automatic RPC generation in DeleteTopics
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../kafka/clients/admin/KafkaAdminClient.java | 16 ++--
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../kafka/common/requests/DeleteTopicsRequest.java | 86 +++++++-------------
.../common/requests/DeleteTopicsResponse.java | 93 +++++-----------------
.../common/message/DeleteTopicsResponse.json | 6 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 16 +++-
.../kafka/common/requests/RequestResponseTest.java | 20 +++--
core/src/main/scala/kafka/server/KafkaApis.scala | 75 ++++++++++-------
.../kafka/api/AuthorizerIntegrationTest.scala | 18 +++--
.../kafka/server/DeleteTopicsRequestTest.scala | 55 ++++++++-----
...leteTopicsRequestWithDeletionDisabledTest.scala | 17 ++--
.../scala/unit/kafka/server/RequestQuotaTest.scala | 10 ++-
13 files changed, 206 insertions(+), 214 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 317cd7c..336597f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -63,6 +63,8 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
@@ -1365,14 +1367,16 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs);
+ return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+ .setTopicNames(validTopicNames)
+ .setTimeoutMs(timeoutMs));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
// Check for controller change
- for (Errors error : response.errors().values()) {
+ for (Errors error : response.errorCounts().keySet()) {
if (error == Errors.NOT_CONTROLLER) {
metadataManager.clearController();
metadataManager.requestUpdate();
@@ -1380,12 +1384,12 @@ public class KafkaAdminClient extends AdminClient {
}
}
// Handle server responses for particular topics.
- for (Map.Entry<String, Errors> entry : response.errors().entrySet()) {
- KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+ for (DeletableTopicResult result : response.data().responses()) {
+ KafkaFutureImpl<Void> future = topicFutures.get(result.name());
if (future == null) {
- log.warn("Server response mentioned unknown topic {}", entry.getKey());
+ log.warn("Server response mentioned unknown topic {}", result.name());
} else {
- ApiException exception = entry.getValue().exception();
+ ApiException exception = Errors.forCode(result.errorCode()).exception();
if (exception != null) {
future.completeExceptionally(exception);
} else {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3f8d80d..ed9787f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
@@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
-import org.apache.kafka.common.requests.DeleteTopicsRequest;
-import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
@@ -153,7 +153,7 @@ public enum ApiKeys {
}
},
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
- DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()),
+ DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
InitProducerIdResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index eadd302..f74e1ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -111,7 +111,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case CREATE_TOPICS:
return new CreateTopicsResponse(struct, version);
case DELETE_TOPICS:
- return new DeleteTopicsResponse(struct);
+ return new DeleteTopicsResponse(struct, version);
case DELETE_RECORDS:
return new DeleteRecordsResponse(struct);
case INIT_PRODUCER_ID:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index facb55e..978d1c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -16,19 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.STRING;
@@ -62,85 +59,62 @@ public class DeleteTopicsRequest extends AbstractRequest {
DELETE_TOPICS_REQUEST_V2, DELETE_TOPICS_REQUEST_V3};
}
- private final Set<String> topics;
- private final Integer timeout;
+ private DeleteTopicsRequestData data;
+ private final short version;
public static class Builder extends AbstractRequest.Builder<DeleteTopicsRequest> {
- private final Set<String> topics;
- private final Integer timeout;
+ private DeleteTopicsRequestData data;
- public Builder(Set<String> topics, Integer timeout) {
+ public Builder(DeleteTopicsRequestData data) {
super(ApiKeys.DELETE_TOPICS);
- this.topics = topics;
- this.timeout = timeout;
+ this.data = data;
}
@Override
public DeleteTopicsRequest build(short version) {
- return new DeleteTopicsRequest(topics, timeout, version);
+ return new DeleteTopicsRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type=DeleteTopicsRequest").
- append(", topics=(").append(Utils.join(topics, ", ")).append(")").
- append(", timeout=").append(timeout).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
- private DeleteTopicsRequest(Set<String> topics, Integer timeout, short version) {
+ private DeleteTopicsRequest(DeleteTopicsRequestData data, short version) {
super(ApiKeys.DELETE_TOPICS, version);
- this.topics = topics;
- this.timeout = timeout;
+ this.data = data;
+ this.version = version;
}
public DeleteTopicsRequest(Struct struct, short version) {
super(ApiKeys.DELETE_TOPICS, version);
- Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
- Set<String> topics = new HashSet<>(topicsArray.length);
- for (Object topic : topicsArray)
- topics.add((String) topic);
-
- this.topics = topics;
- this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
+ this.data = new DeleteTopicsRequestData(struct, version);
+ this.version = version;
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(ApiKeys.DELETE_TOPICS.requestSchema(version()));
- struct.set(TOPICS_KEY_NAME, topics.toArray());
- struct.set(TIMEOUT_KEY_NAME, timeout);
- return struct;
+ return data.toStruct(version);
+ }
+
+ public DeleteTopicsRequestData data() {
+ return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- Map<String, Errors> topicErrors = new HashMap<>();
- for (String topic : topics)
- topicErrors.put(topic, Errors.forException(e));
-
- switch (version()) {
- case 0:
- return new DeleteTopicsResponse(topicErrors);
- case 1:
- case 2:
- case 3:
- return new DeleteTopicsResponse(throttleTimeMs, topicErrors);
- default:
- throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion()));
+ DeleteTopicsResponseData response = new DeleteTopicsResponseData();
+ if (version >= 1) {
+ response.setThrottleTimeMs(throttleTimeMs);
}
- }
-
- public Set<String> topics() {
- return topics;
- }
-
- public Integer timeout() {
- return this.timeout;
+ ApiError apiError = ApiError.fromThrowable(e);
+ for (String topic : data.topicNames()) {
+ response.responses().add(new DeletableTopicResult()
+ .setName(topic)
+ .setErrorCode(apiError.error().code()));
+ }
+ return new DeleteTopicsResponse(response);
}
public static DeleteTopicsRequest parse(ByteBuffer buffer, short version) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 650caa8..aa8e552 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -16,54 +16,18 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
public class DeleteTopicsResponse extends AbstractResponse {
- private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
-
- private static final Schema TOPIC_ERROR_CODE = new Schema(
- TOPIC_NAME,
- ERROR_CODE);
-
- private static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema(
- new Field(TOPIC_ERROR_CODES_KEY_NAME,
- new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
-
- private static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- new Field(TOPIC_ERROR_CODES_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema DELETE_TOPICS_RESPONSE_V2 = DELETE_TOPICS_RESPONSE_V1;
-
- /**
- * v3 request is the same that as v2. The response is different based on the request version.
- * In v3 version a TopicDeletionDisabledException is returned
- */
- private static final Schema DELETE_TOPICS_RESPONSE_V3 = DELETE_TOPICS_RESPONSE_V2;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1,
- DELETE_TOPICS_RESPONSE_V2, DELETE_TOPICS_RESPONSE_V3};
- }
-
/**
* Possible error codes:
@@ -75,63 +39,42 @@ public class DeleteTopicsResponse extends AbstractResponse {
* INVALID_REQUEST(42)
* TOPIC_DELETION_DISABLED(73)
*/
- private final Map<String, Errors> errors;
- private final int throttleTimeMs;
+ private DeleteTopicsResponseData data;
- public DeleteTopicsResponse(Map<String, Errors> errors) {
- this(DEFAULT_THROTTLE_TIME, errors);
+ public DeleteTopicsResponse(DeleteTopicsResponseData data) {
+ this.data = data;
}
- public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) {
- this.throttleTimeMs = throttleTimeMs;
- this.errors = errors;
- }
-
- public DeleteTopicsResponse(Struct struct) {
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
- Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
- Map<String, Errors> errors = new HashMap<>();
- for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
- Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
- String topic = topicErrorCodeStruct.get(TOPIC_NAME);
- Errors error = Errors.forCode(topicErrorCodeStruct.get(ERROR_CODE));
- errors.put(topic, error);
- }
-
- this.errors = errors;
+ public DeleteTopicsResponse(Struct struct, short version) {
+ this.data = new DeleteTopicsResponseData(struct, version);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version));
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
- List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
- for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
- Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
- topicErrorCodeStruct.set(TOPIC_NAME, topicError.getKey());
- topicErrorCodeStruct.set(ERROR_CODE, topicError.getValue().code());
- topicErrorCodeStructs.add(topicErrorCodeStruct);
- }
- struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray());
- return struct;
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
- public Map<String, Errors> errors() {
- return errors;
+ public DeleteTopicsResponseData data() {
+ return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(errors);
+ HashMap<Errors, Integer> counts = new HashMap<>();
+ for (DeletableTopicResult result : data.responses()) {
+ Errors error = Errors.forCode(result.errorCode());
+ counts.put(error, counts.getOrDefault(error, 0) + 1);
+ }
+ return counts;
}
public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) {
- return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer));
+ return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.parseResponse(version, buffer), version);
}
@Override
diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
index cf0837b..4cea44b 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
@@ -22,11 +22,11 @@
// Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
"validVersions": "0-3",
"fields": [
- { "name": "throttleTimeMs", "type": "int32", "versions": "1+",
+ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
- "about": "The results for each topic.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+",
+ "about": "The results for each topic we tried to delete.", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The topic name" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The deletion error, or 0 if the deletion succeeded." }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index af6f49b..220fc50 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -53,6 +53,8 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
@@ -223,6 +225,14 @@ public class KafkaAdminClientTest {
return new CreateTopicsResponse(data);
}
+ private static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName, Errors error) {
+ DeleteTopicsResponseData data = new DeleteTopicsResponseData();
+ data.responses().add(new DeletableTopicResult()
+ .setName(topicName)
+ .setErrorCode(error.code()));
+ return new DeleteTopicsResponse(data);
+ }
+
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@@ -394,19 +404,19 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
- new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
+ prepareDeleteTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
future.get();
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
- new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.TOPIC_DELETION_DISABLED)));
+ prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
- new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+ prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5690743..5b92b1b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -38,6 +38,9 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteTopicsRequestData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData;
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
@@ -1124,14 +1127,21 @@ public class RequestResponseTest {
}
private DeleteTopicsRequest createDeleteTopicsRequest() {
- return new DeleteTopicsRequest.Builder(Utils.mkSet("my_t1", "my_t2"), 10000).build();
+ return new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList("my_t1", "my_t2"))
+ .setTimeoutMs(1000)).build();
}
private DeleteTopicsResponse createDeleteTopicsResponse() {
- Map<String, Errors> errors = new HashMap<>();
- errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
- errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED);
- return new DeleteTopicsResponse(errors);
+ DeleteTopicsResponseData data = new DeleteTopicsResponseData();
+ data.responses().add(new DeletableTopicResult()
+ .setName("t1")
+ .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()));
+ data.responses().add(new DeletableTopicResult()
+ .setName("t2")
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()));
+ return new DeleteTopicsResponse(data);
}
private InitProducerIdRequest createInitPidRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 68d0823..fc06162 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -47,6 +47,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
+import org.apache.kafka.common.message.DeleteTopicsResponseData
+import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
import org.apache.kafka.common.message.DescribeGroupsResponseData
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.JoinGroupResponseData
@@ -1565,51 +1567,66 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
- val deleteTopicRequest = request.body[DeleteTopicsRequest]
-
- val unauthorizedTopicErrors = mutable.Map[String, Errors]()
- val nonExistingTopicErrors = mutable.Map[String, Errors]()
- val authorizedForDeleteTopics = mutable.Set[String]()
-
- for (topic <- deleteTopicRequest.topics.asScala) {
- if (!authorize(request.session, Delete, Resource(Topic, topic, LITERAL)))
- unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED
- else if (!metadataCache.contains(topic))
- nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION
- else
- authorizedForDeleteTopics.add(topic)
- }
-
- def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit = {
+ def sendResponseCallback(results: DeletableTopicResultSet): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val completeResults = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ authorizedTopicErrors
- val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
+ val responseData = new DeleteTopicsResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setResponses(results)
+ val responseBody = new DeleteTopicsResponse(responseData)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
+ val deleteTopicRequest = request.body[DeleteTopicsRequest]
+ val results = new DeletableTopicResultSet(deleteTopicRequest.data.topicNames.size)
+ val toDelete = mutable.Set[String]()
if (!controller.isActive) {
- val results = deleteTopicRequest.topics.asScala.map { topic =>
- (topic, Errors.NOT_CONTROLLER)
- }.toMap
+ deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+ results.add(new DeletableTopicResult()
+ .setName(topic)
+ .setErrorCode(Errors.NOT_CONTROLLER.code))
+ }
sendResponseCallback(results)
} else if (!config.deleteTopicEnable) {
val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED
- val results = deleteTopicRequest.topics.asScala.map { topic =>
- (topic, error)
- }.toMap
+ deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+ results.add(new DeletableTopicResult()
+ .setName(topic)
+ .setErrorCode(error.code))
+ }
sendResponseCallback(results)
} else {
+ deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
+ results.add(new DeletableTopicResult()
+ .setName(topic))
+ }
+ results.asScala.foreach(topic => {
+ if (!authorize(request.session, Delete, Resource(Topic, topic.name, LITERAL)))
+ topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ else if (!metadataCache.contains(topic.name))
+ topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ else
+ toDelete += topic.name
+ })
// If no authorized topics return immediately
- if (authorizedForDeleteTopics.isEmpty)
- sendResponseCallback(Map())
+ if (toDelete.isEmpty)
+ sendResponseCallback(results)
else {
+ def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
+ errors.foreach {
+ case (topicName, error) =>
+ results.find(topicName)
+ .setErrorCode(error.code)
+ }
+ sendResponseCallback(results)
+ }
+
adminManager.deleteTopics(
- deleteTopicRequest.timeout.toInt,
- authorizedForDeleteTopics,
- sendResponseCallback
+ deleteTopicRequest.data.timeoutMs.toInt,
+ toDelete,
+ handleDeleteTopicsResults
)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 30cc161..11ee87c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -172,8 +172,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
- ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data().topics().find(createTopic).errorCode())),
- ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2),
+ ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data.topics.find(createTopic).errorCode())),
+ ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.find(deleteTopic).errorCode())),
ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(deleteRecordsPartition).error),
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
@@ -371,7 +371,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
setName(createTopic).setNumPartitions(1).
setReplicationFactor(1.toShort)).iterator))).build()
- private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
+ private def deleteTopicsRequest =
+ new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Collections.singletonList(deleteTopic))
+ .setTimeoutMs(5000)).build()
private def deleteRecordsRequest = new DeleteRecordsRequest.Builder(5000, Collections.singletonMap(deleteRecordsPartition, 0L)).build()
@@ -1184,7 +1188,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
- assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(deleteTopic).errorCode)
}
@Test
@@ -1194,7 +1198,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
- assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(deleteTopic).errorCode)
}
@Test
@@ -1204,7 +1208,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
- assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2)
+ assertEquals(Errors.NONE.code, deleteResponse.data.responses.find(deleteTopic).errorCode)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 4388e64..2df7528 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -19,12 +19,15 @@ package kafka.server
import kafka.network.SocketServer
import kafka.utils._
+import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
+import java.util.Collections
+import java.util.Arrays
class DeleteTopicsRequestTest extends BaseRequestTest {
@@ -33,20 +36,24 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
val timeout = 10000
// Single topic
createTopic("topic-1", 1, 1)
- validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-1").asJava, timeout).build())
+ validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList("topic-1"))
+ .setTimeoutMs(timeout)).build())
// Multi topic
createTopic("topic-3", 5, 2)
createTopic("topic-4", 1, 2)
- validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava, timeout).build())
+ validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList("topic-3", "topic-4"))
+ .setTimeoutMs(timeout)).build())
}
private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = {
val response = sendDeleteTopicsRequest(request)
-
- val error = response.errors.values.asScala.find(_ != Errors.NONE)
- assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
-
- request.topics.asScala.foreach { topic =>
+ val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
+ assertTrue(s"There should be no errors, found ${response.data.responses.asScala}", error.isEmpty)
+ request.data.topicNames.asScala.foreach { topic =>
validateTopicIsDeleted(topic)
}
}
@@ -57,14 +64,18 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
val timeoutTopic = "invalid-timeout"
// Basic
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("invalid-topic").asJava, timeout).build(),
+ validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList("invalid-topic"))
+ .setTimeoutMs(timeout)).build(),
Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
// Partial
createTopic("partial-topic-1", 1, 1)
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(
- "partial-topic-1",
- "partial-invalid-topic").asJava, timeout).build(),
+ validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList("partial-topic-1", "partial-invalid-topic"))
+ .setTimeoutMs(timeout)).build(),
Map(
"partial-topic-1" -> Errors.NONE,
"partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -74,7 +85,10 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
// Timeout
createTopic(timeoutTopic, 5, 2)
// Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has succeeded in the past.
- validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava, 0).build(),
+ validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Arrays.asList(timeoutTopic))
+ .setTimeoutMs(0)).build(),
Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
// The topic should still get deleted eventually
TestUtils.waitUntilTrue(() => !servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is never deleted")
@@ -83,11 +97,13 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = {
val response = sendDeleteTopicsRequest(request)
- val errors = response.errors.asScala
- assertEquals("The response size should match", expectedResponse.size, response.errors.size)
+ val errors = response.data.responses
+
+ val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2)
+ assertEquals("The response size should match", expectedResponse.size, errorCount)
expectedResponse.foreach { case (topic, expectedError) =>
- assertEquals("The response error should match", expectedResponse(topic), errors(topic))
+ assertEquals("The response error should match", expectedResponse(topic).code, errors.find(topic).errorCode)
// If no error validate the topic was deleted
if (expectedError == Errors.NONE) {
validateTopicIsDeleted(topic)
@@ -97,11 +113,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
@Test
def testNotController() {
- val request = new DeleteTopicsRequest.Builder(Set("not-controller").asJava, 1000).build()
+ val request = new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Collections.singletonList("not-controller"))
+ .setTimeoutMs(1000)).build()
val response = sendDeleteTopicsRequest(request, notControllerSocketServer)
- val error = response.errors.asScala.head._2
- assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error)
+ val error = response.data.responses().find("not-controller").errorCode()
+ assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER.code, error)
}
private def validateTopicIsDeleted(topic: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index 20e30c0..7240d77 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -19,12 +19,13 @@ package kafka.server
import kafka.network.SocketServer
import kafka.utils._
+import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse}
import org.junit.Assert._
import org.junit.Test
-import scala.collection.JavaConverters._
+import java.util.Collections
class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
@@ -42,13 +43,19 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest {
@Test
def testDeleteRecordsRequest() {
val topic = "topic-1"
- val request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 1000).build()
+ val request = new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Collections.singletonList(topic))
+ .setTimeoutMs(1000)).build()
val response = sendDeleteTopicsRequest(request)
- assertEquals(Errors.TOPIC_DELETION_DISABLED, response.errors.get(topic))
+ assertEquals(Errors.TOPIC_DELETION_DISABLED.code, response.data.responses.find(topic).errorCode)
- val v2request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 1000).build(2)
+ val v2request = new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Collections.singletonList(topic))
+ .setTimeoutMs(1000)).build(2)
val v2response = sendDeleteTopicsRequest(v2request)
- assertEquals(Errors.INVALID_REQUEST, v2response.errors.get(topic))
+ assertEquals(Errors.INVALID_REQUEST.code, v2response.data.responses.find(topic).errorCode)
}
private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 22967e2..28f7e07 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -24,7 +24,7 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
@@ -307,7 +307,10 @@ class RequestQuotaTest extends BaseRequestTest {
}
case ApiKeys.DELETE_TOPICS =>
- new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
+ new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData()
+ .setTopicNames(Collections.singletonList("topic-2"))
+ .setTimeoutMs(5000))
case ApiKeys.DELETE_RECORDS =>
new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
@@ -469,7 +472,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
- case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
+ case ApiKeys.DELETE_TOPICS =>
+ new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs
case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs