You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kirktrue (via GitHub)" <gi...@apache.org> on 2023/04/28 21:50:26 UTC

[GitHub] [kafka] kirktrue commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

kirktrue commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180838837


##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
         // Works fine with current version (>= 7)
         ProduceRequest.forCurrentMagic(produceData);
     }
+    
+    @Test
+    public void testNoMixedProducerIds() {
+        final long producerId1 = 15L;
+        final long producerId2 = 16L;
+        final short producerEpoch = 5;
+        final int sequence = 10;
+
+        final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord("foo".getBytes()));
+        final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1,
+                producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
+        final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2,
+                producerEpoch, sequence, new SimpleRecord("bee".getBytes()));
+
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(
+                new ProduceRequestData()
+                        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))),
+                                new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))),
+                                new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList(

Review Comment:
   Please change `bee` to `baz` 😛



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {
+            long producerId = -1;
+            for (ProduceRequestData.TopicProduceData topicData : data.topicData()) {
+                for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
+                    BaseRecords baseRecords = partitionData.records();
+                    if (baseRecords instanceof Records) {
+                        Records records = (Records) baseRecords;
+                        for (RecordBatch batch : records.batches()) {
+                            if (producerId == -1 && batch.hasProducerId())

Review Comment:
   Is it an error if there's one batch with producer ID and another without one?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
           (entriesPerPartition, Map.empty)
         else
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)

Review Comment:
   Is it valid for `isTransactional` to be `true` but `hasProducerId` to be `false`?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {

Review Comment:
   For the uninitiated, can we move and/or comment why version `3` is special?



##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
         // Works fine with current version (>= 7)
         ProduceRequest.forCurrentMagic(produceData);
     }
+    
+    @Test
+    public void testNoMixedProducerIds() {
+        final long producerId1 = 15L;
+        final long producerId2 = 16L;
+        final short producerEpoch = 5;
+        final int sequence = 10;
+
+        final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord("foo".getBytes()));
+        final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1,
+                producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
+        final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2,
+                producerEpoch, sequence, new SimpleRecord("bee".getBytes()));
+
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(
+                new ProduceRequestData()
+                        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))),
+                                new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))),
+                                new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(idempotentRecords))))
+                        .iterator()))
+                .setAcks((short) 1)
+                .setTimeoutMs(5000));
+        IntStream.range(3, ApiKeys.PRODUCE.latestVersion())

Review Comment:
   Another place where knowing why version three is special would be helpful (for me).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org