You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/23 02:56:53 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

abbccdda commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413471840



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##########
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-            TRANSACTIONAL_ID,
-            PRODUCER_ID,
-            PRODUCER_EPOCH,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))),
-                    "The partitions to add to the transaction."));
-
-    /**
-     * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
-     */
-    private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-    }
+
+    public final AddPartitionsToTxnRequestData data;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
-        private final String transactionalId;
-        private final long producerId;
-        private final short producerEpoch;
-        private final List<TopicPartition> partitions;
+        public final AddPartitionsToTxnRequestData data;
 
-        public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
+        public Builder(final AddPartitionsToTxnRequestData data) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.transactionalId = transactionalId;
-            this.producerId = producerId;
-            this.producerEpoch = producerEpoch;
-            this.partitions = partitions;
+            this.data = data;
+        }
+
+        public Builder(final String transactionalId,
+                       final long producerId,
+                       final short producerEpoch,
+                       final List<TopicPartition> partitions) {
+            super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+            Map<String, List<Integer>> partitionMap = new HashMap<>();
+            for (TopicPartition topicPartition : partitions) {
+                String topicName = topicPartition.topic();
+
+                List<Integer> subPartitions = partitionMap.getOrDefault(topicName,
+                    new ArrayList<>());
+                subPartitions.add(topicPartition.partition());
+                partitionMap.put(topicName, subPartitions);
+            }
+
+            AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection();
+            for (Map.Entry<String, List<Integer>> partitionEntry : partitionMap.entrySet()) {
+                topics.add(new AddPartitionsToTxnTopic()
+                               .setName(partitionEntry.getKey())
+                               .setPartitions(partitionEntry.getValue()));
+            }
+
+            this.data = new AddPartitionsToTxnRequestData()
+                            .setTransactionalId(transactionalId)
+                            .setProducerId(producerId)
+                            .setProducerEpoch(producerEpoch)
+                            .setTopics(topics);
         }
 
         @Override
         public AddPartitionsToTxnRequest build(short version) {
-            return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
+            return new AddPartitionsToTxnRequest(data, version);
         }
 
         public List<TopicPartition> partitions() {
+            return getPartitions(data);
+        }
+
+        static List<TopicPartition> getPartitions(AddPartitionsToTxnRequestData data) {
+            List<TopicPartition> partitions = new ArrayList<>();
+            for (AddPartitionsToTxnTopic topicCollection : data.topics()) {
+                for (Integer partition : topicCollection.partitions()) {
+                    partitions.add(new TopicPartition(topicCollection.name(), partition));
+                }
+            }
             return partitions;
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=AddPartitionsToTxnRequest").
-                    append(", transactionalId=").append(transactionalId).
-                    append(", producerId=").append(producerId).
-                    append(", producerEpoch=").append(producerEpoch).
-                    append(", partitions=").append(partitions).
-                    append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    private final String transactionalId;
-    private final long producerId;
-    private final short producerEpoch;
-    private final List<TopicPartition> partitions;
-
-    private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
-                                      List<TopicPartition> partitions) {
+    public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = transactionalId;
-        this.producerId = producerId;
-        this.producerEpoch = producerEpoch;
-        this.partitions = partitions;
+        this.data = data;
     }
 
     public AddPartitionsToTxnRequest(Struct struct, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
-        this.transactionalId = struct.get(TRANSACTIONAL_ID);
-        this.producerId = struct.get(PRODUCER_ID);
-        this.producerEpoch = struct.get(PRODUCER_EPOCH);
-
-        List<TopicPartition> partitions = new ArrayList<>();
-        Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME);
-        for (Object topicPartitionObj : topicPartitionsArray) {
-            Struct topicPartitionStruct = (Struct) topicPartitionObj;
-            String topic = topicPartitionStruct.get(TOPIC_NAME);
-            for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
-                partitions.add(new TopicPartition(topic, (Integer) partitionObj));
-            }
-        }
-        this.partitions = partitions;
-    }
-
-    public String transactionalId() {
-        return transactionalId;
-    }
-
-    public long producerId() {
-        return producerId;
-    }
-
-    public short producerEpoch() {
-        return producerEpoch;
+        this.data = new AddPartitionsToTxnRequestData(struct, version);
     }
 
     public List<TopicPartition> partitions() {
-        return partitions;
+        return Builder.getPartitions(data);

Review comment:
       Sg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org