You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "guozhangwang (via GitHub)" <gi...@apache.org> on 2023/02/16 00:12:36 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

guozhangwang commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107858293


##########
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java:
##########
@@ -17,43 +17,138 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.util.ArrayList;
-
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class AddPartitionsToTxnRequestTest {
-
-    private static String transactionalId = "transactionalId";
+    private final String transactionalId1 = "transaction1";
+    private final String transactionalId2 = "transaction2";
     private static int producerId = 10;
     private static short producerEpoch = 1;
     private static int throttleTimeMs = 10;
+    private static TopicPartition tp0 = new TopicPartition("topic", 0);
+    private static TopicPartition tp1 = new TopicPartition("topic", 1);
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
     public void testConstructor(short version) {
-        List<TopicPartition> partitions = new ArrayList<>();
-        partitions.add(new TopicPartition("topic", 0));
-        partitions.add(new TopicPartition("topic", 1));
+        

Review Comment:
   Do we already have a test coverage with the old `Builder` constructor to check that `verifyOnly` would default to `false`?



##########
clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json:
##########
@@ -22,22 +22,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0+",
-      "about": "The results for each topic.", "fields": [
+    { "name": "ResultsByTransaction", "type": "[]AddPartitionsToTxnResult", "versions": "4+",
+      "about": "Results categorized by transactional ID.", "fields": [
+      { "name": "TransactionalId", "type": "string", "versions": "4+", "mapKey": true, "entityType": "transactionalId",
+        "about": "The transactional id corresponding to the transaction."},
+      { "name": "TopicResults", "type": "[]AddPartitionsToTxnTopicResult", "versions": "4+",
+        "about": "The results for each topic." }
+    ]},
+    { "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0-3",

Review Comment:
   nit: maybe rename this field from `Results` to `TransactionResultsByTopic`?



##########
clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json:
##########
@@ -22,22 +22,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0+",
-      "about": "The results for each topic.", "fields": [
+    { "name": "ResultsByTransaction", "type": "[]AddPartitionsToTxnResult", "versions": "4+",
+      "about": "Results categorized by transactional ID.", "fields": [
+      { "name": "TransactionalId", "type": "string", "versions": "4+", "mapKey": true, "entityType": "transactionalId",
+        "about": "The transactional id corresponding to the transaction."},
+      { "name": "TopicResults", "type": "[]AddPartitionsToTxnTopicResult", "versions": "4+",
+        "about": "The results for each topic." }
+    ]},
+    { "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0-3",
+      "about": "The results for each topic." }
+  ],
+  "commonStructs": [
+    { "name": "AddPartitionsToTxnTopicResult", "versions": "0+", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Results", "type": "[]AddPartitionsToTxnPartitionResult", "versions": "0+", 
-        "about": "The results for each partition", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
-          "about": "The partition indexes." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The response error code."}
-      ]}
+      { "name": "Results", "type": "[]AddPartitionsToTxnPartitionResult", "versions": "0+",

Review Comment:
   ditto here, maybe `TopicResultsByPartition`?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
               // this is an optimization: if the partitions are already in the metadata reply OK immediately
               Left(Errors.NONE)
             } else {
-              Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
+              // If verifyOnly, we should have returned in the step above. If we didn't the partitions are not present in the transaction.
+              if (verifyOnly) {
+                Left(Errors.INVALID_TXN_STATE)

Review Comment:
   +1 here, since we do have the finest error-code level at the per-partition level we can leverage it to be more detailed.
   
   As for whether we should also add a global error code: in general I'd like that idea, and I think there are a few error codes: auth, unsupported_version, unsupported_format (we also have a `UNSUPPORTED_FOR_MESSAGE_FORMAT` for that), that should always be global to save the sender time to dig into lower-leveled ones. And then for different request type there are some global errors based on just the global fields --- for this case, the only global field left now is `verifyOnly` so that we probably do not have additional error codes. I vaguely remember there's a KIP proposing such a principle but I cannot find the KIP now.
   
   If we agree this is a generally good idea then let's start with this one first :P



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org