You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/22 22:53:23 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

guozhangwang commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413374355



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##########
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-            TRANSACTIONAL_ID,
-            PRODUCER_ID,
-            PRODUCER_EPOCH,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
-                    "The partitions to add to the transaction."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-    }
+
+    public final AddPartitionsToTxnRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
-        private final String transactionalId;
-        private final long producerId;
-        private final short producerEpoch;
-        private final List<TopicPartition> partitions;
+        public final AddPartitionsToTxnRequestData data;
 
-        public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+        public Builder(final AddPartitionsToTxnRequestData data) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.transactionalId = transactionalId;
-            this.producerId = producerId;
-            this.producerEpoch = producerEpoch;
-            this.partitions = partitions;
+            this.data = data;
+        }
+
+        public Builder(final String transactionalId,
+                       final long producerId,
+                       final short producerEpoch,
+                       final List<TopicPartition> partitions) {
+            super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+            Map<String, List<Integer>> partitionMap = new HashMap<>();
+            for (TopicPartition topicPartition : partitions) {
+                String topicName = topicPartition.topic();
+
+                List<Integer> subPartitions = partitionMap.getOrDefault(topicName,
+                    new ArrayList<>());
+                subPartitions.add(topicPartition.partition());
+                partitionMap.put(topicName, subPartitions);
+            }
+
+            AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection();
+            for (Map.Entry<String, List<Integer>> partitionEntry : partitionMap.entrySet()) {
+                topics.add(new AddPartitionsToTxnTopic()
+                               .setName(partitionEntry.getKey())
+                               .setPartitions(partitionEntry.getValue()));
+            }
+
+            this.data = new AddPartitionsToTxnRequestData()
+                            .setTransactionalId(transactionalId)
+                            .setProducerId(producerId)
+                            .setProducerEpoch(producerEpoch)
+                            .setTopics(topics);
         }
 
         @Override
         public AddPartitionsToTxnRequest build(short version) {
-            return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
+            return new AddPartitionsToTxnRequest(data, version);
         }
 
         public List<TopicPartition> partitions() {
+            return getPartitions(data);
+        }
+
+        static List<TopicPartition> getPartitions(AddPartitionsToTxnRequestData data) {
+            List<TopicPartition> partitions = new ArrayList<>();
+            for (AddPartitionsToTxnTopic topicCollection : data.topics()) {
+                for (Integer partition : topicCollection.partitions()) {
+                    partitions.add(new TopicPartition(topicCollection.name(), partition));
+                }
+            }
             return partitions;
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=AddPartitionsToTxnRequest").
-                    append(", transactionalId=").append(transactionalId).
-                    append(", producerId=").append(producerId).
-                    append(", producerEpoch=").append(producerEpoch).
-                    append(", partitions=").append(partitions).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String transactionalId;
-    private final long producerId;
-    private final short producerEpoch;
-    private final List<TopicPartition> partitions;
-
-    private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
-                                      List<TopicPartition> partitions) {
+    public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = transactionalId;
-        this.producerId = producerId;
-        this.producerEpoch = producerEpoch;
-        this.partitions = partitions;
+        this.data = data;
     }
 
     public AddPartitionsToTxnRequest(Struct struct, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = struct.get(TRANSACTIONAL_ID);
-        this.producerId = struct.get(PRODUCER_ID);
-        this.producerEpoch = struct.get(PRODUCER_EPOCH);
-
-        List<TopicPartition> partitions = new ArrayList<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
-        for (Object topicPartitionObj : topicPartitionsArray) {
-            Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.get(TOPIC_NAME);
-            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
-                partitions.add(new TopicPartition(topic, (Integer) partitionObj));
-            }
-        }
-        this.partitions = partitions;
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public short producerEpoch() {
-        return producerEpoch;
+        this.data = new AddPartitionsToTxnRequestData(struct, version);
     }
 
     public List<TopicPartition> partitions() {
-        return partitions;
+        return Builder.getPartitions(data);

Review comment:
       Although today we are only calling this func once, if in the future we call it multiple times we'd pay the cycles each time to parse the map into the list. Could we cache the parsed list locally and when it is called again we can avoid `Builder.getPartitions`?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##########
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-            TRANSACTIONAL_ID,
-            PRODUCER_ID,
-            PRODUCER_EPOCH,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
-                    "The partitions to add to the transaction."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-    }
+
+    public final AddPartitionsToTxnRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
-        private final String transactionalId;
-        private final long producerId;
-        private final short producerEpoch;
-        private final List<TopicPartition> partitions;
+        public final AddPartitionsToTxnRequestData data;
 
-        public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+        public Builder(final AddPartitionsToTxnRequestData data) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.transactionalId = transactionalId;
-            this.producerId = producerId;
-            this.producerEpoch = producerEpoch;
-            this.partitions = partitions;
+            this.data = data;
+        }
+
+        public Builder(final String transactionalId,
+                       final long producerId,
+                       final short producerEpoch,
+                       final List<TopicPartition> partitions) {
+            super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+            Map<String, List<Integer>> partitionMap = new HashMap<>();
+            for (TopicPartition topicPartition : partitions) {
+                String topicName = topicPartition.topic();
+
+                List<Integer> subPartitions = partitionMap.getOrDefault(topicName,

Review comment:
       nit: we can use `map#compute` to replace getOrDefault + put.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
##########
@@ -17,129 +17,108 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResultCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResultCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#NOT_COORDINATOR}
+ *   - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ *   - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ *   - {@link Errors#INVALID_TXN_STATE}
+ *   - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
+ *   - {@link Errors#INVALID_PRODUCER_EPOCH}
+ *   - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
+ *   - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
+ *   - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ */
 public class AddPartitionsToTxnResponse extends AbstractResponse {
-    private static final String ERRORS_KEY_NAME = "errors";
-    private static final String PARTITION_ERRORS = "partition_errors";
-
-    private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
-            THROTTLE_TIME_MS,
-            new Field(ERRORS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITION_ERRORS, new ArrayOf(new Schema(
-                            PARTITION_ID,
-                            ERROR_CODE)))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V1 = ADD_PARTITIONS_TO_TXN_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_RESPONSE_V0, ADD_PARTITIONS_TO_TXN_RESPONSE_V1};
-    }
 
-    private final int throttleTimeMs;
-
-    // Possible error codes:
-    //   NotCoordinator
-    //   CoordinatorNotAvailable
-    //   CoordinatorLoadInProgress
-    //   InvalidTxnState
-    //   InvalidProducerIdMapping
-    //   TopicAuthorizationFailed
-    //   InvalidProducerEpoch
-    //   UnknownTopicOrPartition
-    //   TopicAuthorizationFailed
-    //   TransactionalIdAuthorizationFailed
-    private final Map<TopicPartition, Errors> errors;
+    public final AddPartitionsToTxnResponseData data;
 
-    public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.errors = errors;
+    public AddPartitionsToTxnResponse(Struct struct, short version) {
+        this.data = new AddPartitionsToTxnResponseData(struct, version);
     }
 
-    public AddPartitionsToTxnResponse(Struct struct) {
-        this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        errors = new HashMap<>();
-        for (Object topic : struct.getArray(ERRORS_KEY_NAME)) {
-            Struct topicStruct = (Struct) topic;
-            final String topicName = topicStruct.get(TOPIC_NAME);
-            for (Object partition : topicStruct.getArray(PARTITION_ERRORS)) {
-                Struct partitionStruct = (Struct) partition;
-                TopicPartition topicPartition = new TopicPartition(topicName, partitionStruct.get(PARTITION_ID));
-                errors.put(topicPartition, Errors.forCode(partitionStruct.get(ERROR_CODE)));
-            }
+    public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
+
+        Map<String, AddPartitionsToTxnPartitionResultCollection> resultMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            String topicName = topicPartition.topic();
+
+            AddPartitionsToTxnPartitionResult partitionResult =
+                new AddPartitionsToTxnPartitionResult()
+                    .setErrorCode(entry.getValue().code())
+                    .setPartitionIndex(topicPartition.partition());
+
+            AddPartitionsToTxnPartitionResultCollection partitionResultCollection = resultMap.getOrDefault(
+                topicName, new AddPartitionsToTxnPartitionResultCollection()
+            );
+
+            partitionResultCollection.add(partitionResult);
+            resultMap.put(topicName, partitionResultCollection);
+        }
+
+        AddPartitionsToTxnTopicResultCollection topicCollection = new AddPartitionsToTxnTopicResultCollection();
+        for (Map.Entry<String, AddPartitionsToTxnPartitionResultCollection> entry : resultMap.entrySet()) {
+            topicCollection.add(new AddPartitionsToTxnTopicResult()
+                                    .setName(entry.getKey())
+                                    .setResults(entry.getValue()));
         }
+
+        this.data = new AddPartitionsToTxnResponseData()
+                        .setThrottleTimeMs(throttleTimeMs)
+                        .setResults(topicCollection);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
+        return data.throttleTimeMs();
     }
 
     public Map<TopicPartition, Errors> errors() {
-        return errors;
+        Map<TopicPartition, Errors> errorsMap = new HashMap<>();

Review comment:
       Similar here, we can cache the result in case to be reused.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##########
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-            TRANSACTIONAL_ID,
-            PRODUCER_ID,
-            PRODUCER_EPOCH,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
-                    "The partitions to add to the transaction."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-    }
+
+    public final AddPartitionsToTxnRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
-        private final String transactionalId;
-        private final long producerId;
-        private final short producerEpoch;
-        private final List<TopicPartition> partitions;
+        public final AddPartitionsToTxnRequestData data;
 
-        public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+        public Builder(final AddPartitionsToTxnRequestData data) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.transactionalId = transactionalId;
-            this.producerId = producerId;
-            this.producerEpoch = producerEpoch;
-            this.partitions = partitions;
+            this.data = data;
+        }
+
+        public Builder(final String transactionalId,
+                       final long producerId,
+                       final short producerEpoch,
+                       final List<TopicPartition> partitions) {
+            super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+            Map<String, List<Integer>> partitionMap = new HashMap<>();
+            for (TopicPartition topicPartition : partitions) {
+                String topicName = topicPartition.topic();
+
+                List<Integer> subPartitions = partitionMap.getOrDefault(topicName,
+                    new ArrayList<>());
+                subPartitions.add(topicPartition.partition());
+                partitionMap.put(topicName, subPartitions);
+            }
+
+            AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection();
+            for (Map.Entry<String, List<Integer>> partitionEntry : partitionMap.entrySet()) {
+                topics.add(new AddPartitionsToTxnTopic()
+                               .setName(partitionEntry.getKey())
+                               .setPartitions(partitionEntry.getValue()));
+            }
+
+            this.data = new AddPartitionsToTxnRequestData()
+                            .setTransactionalId(transactionalId)
+                            .setProducerId(producerId)
+                            .setProducerEpoch(producerEpoch)
+                            .setTopics(topics);
         }
 
         @Override
         public AddPartitionsToTxnRequest build(short version) {
-            return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
+            return new AddPartitionsToTxnRequest(data, version);
         }
 
         public List<TopicPartition> partitions() {
+            return getPartitions(data);
+        }
+
+        static List<TopicPartition> getPartitions(AddPartitionsToTxnRequestData data) {
+            List<TopicPartition> partitions = new ArrayList<>();
+            for (AddPartitionsToTxnTopic topicCollection : data.topics()) {
+                for (Integer partition : topicCollection.partitions()) {
+                    partitions.add(new TopicPartition(topicCollection.name(), partition));
+                }
+            }
             return partitions;
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=AddPartitionsToTxnRequest").
-                    append(", transactionalId=").append(transactionalId).
-                    append(", producerId=").append(producerId).
-                    append(", producerEpoch=").append(producerEpoch).
-                    append(", partitions=").append(partitions).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String transactionalId;
-    private final long producerId;
-    private final short producerEpoch;
-    private final List<TopicPartition> partitions;
-
-    private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
-                                      List<TopicPartition> partitions) {
+    public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = transactionalId;
-        this.producerId = producerId;
-        this.producerEpoch = producerEpoch;
-        this.partitions = partitions;
+        this.data = data;
     }
 
     public AddPartitionsToTxnRequest(Struct struct, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = struct.get(TRANSACTIONAL_ID);
-        this.producerId = struct.get(PRODUCER_ID);
-        this.producerEpoch = struct.get(PRODUCER_EPOCH);
-
-        List<TopicPartition> partitions = new ArrayList<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
-        for (Object topicPartitionObj : topicPartitionsArray) {
-            Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.get(TOPIC_NAME);
-            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
-                partitions.add(new TopicPartition(topic, (Integer) partitionObj));
-            }
-        }
-        this.partitions = partitions;
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public short producerEpoch() {
-        return producerEpoch;
+        this.data = new AddPartitionsToTxnRequestData(struct, version);
     }
 
     public List<TopicPartition> partitions() {
-        return partitions;
+        return Builder.getPartitions(data);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
-        struct.set(TRANSACTIONAL_ID, transactionalId);
-        struct.set(PRODUCER_ID, producerId);
-        struct.set(PRODUCER_EPOCH, producerEpoch);
-
-        Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupPartitionsByTopic(partitions);
-        Object[] partitionsArray = new Object[mappedPartitions.size()];
-        int i = 0;
-        for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
-            Struct topicPartitionsStruct = struct.instance(TOPICS_KEY_NAME);
-            topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
-            topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
-            partitionsArray[i++] = topicPartitionsStruct;
-        }
-
-        struct.set(TOPICS_KEY_NAME, partitionsArray);
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         final HashMap<TopicPartition, Errors> errors = new HashMap<>();
-        for (TopicPartition partition : partitions) {
+        for (TopicPartition partition : Builder.getPartitions(data)) {

Review comment:
       Same here, we can cache the result of `Builder.getPartitions(data)` for re-use.

##########
File path: core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
##########
@@ -27,9 +27,9 @@ import org.junit.{Before, Test}
 
 import scala.jdk.CollectionConverters._
 
-class AddPartitionsToTxnRequestTest extends BaseRequestTest {
-  private val topic1 = "foobartopic"
-  val numPartitions = 3
+class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {

Review comment:
       Why we want to change the test name?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org