You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jolshan (via GitHub)" <gi...@apache.org> on 2023/04/18 19:16:34 UTC

[GitHub] [kafka] jolshan opened a new pull request, #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

jolshan opened a new pull request, #13607:
URL: https://github.com/apache/kafka/pull/13607

   Adds validation to ensure all producer IDs in a transactional/idempotent produce request are the same.
   
   Also modifies verification to only add a partition to verify if it is transactional. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1171959120


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -565,6 +565,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
+      try {
+        ProduceRequest.validateProducerIds(request.header.apiVersion, produceRequest.data)
+      } catch {
+        case e: InvalidRecordException =>
+          requestHelper.sendErrorResponseMaybeThrottle(request, Errors.INVALID_RECORD.exception)

Review Comment:
   Should we use `return` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864382


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
           (entriesPerPartition, Map.empty)
         else
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)

Review Comment:
   it would be the other way around, we can have a producer id but not be transactional with idempotent producers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1183173661


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         // map not yet verified partitions to a request object
+        // Since verification occurs on produce requests only, and each produce request has one batch per partition, we know the producer ID is transactional

Review Comment:
   But do we know that the producerId is consistent among all transactional batches? Seems like we are assuming it below, but where is it verified?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
           (entriesPerPartition, Map.empty)
         else
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
+            if (!transactionalBatches.isEmpty) {

Review Comment:
   nit: `nonEmpty`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1528168395

   One concern that came up when I was thinking about this PR is if we can have a produce request with more than one producer ID when we overflow epoch. Basically, I need to confirm we flush the accumulator.
   
   Alternatively, we can allow for more than one producer ID -- the main concern was that the verification could send the wrong one and get invalid producer ID mapping. Then we would have to retry. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1533922748

   haven't updated the base in a while, so merging so that we can hopefully get a cleaner build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1181887737


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -744,6 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         // map not yet verified partitions to a request object
+        // verification only occurs on produce requests and those will always have one batch for versions that support transactions.
         val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()

Review Comment:
   I assumed this batch would be transactional, but it may not be. Need to fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1515111133

   I believe the connect mirror failures are known -- so no issues from the testing front.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180838837


##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
         // Works fine with current version (>= 7)
         ProduceRequest.forCurrentMagic(produceData);
     }
+    
+    @Test
+    public void testNoMixedProducerIds() {
+        final long producerId1 = 15L;
+        final long producerId2 = 16L;
+        final short producerEpoch = 5;
+        final int sequence = 10;
+
+        final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord("foo".getBytes()));
+        final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1,
+                producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
+        final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2,
+                producerEpoch, sequence, new SimpleRecord("bee".getBytes()));
+
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(
+                new ProduceRequestData()
+                        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))),
+                                new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))),
+                                new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList(

Review Comment:
   Please change `bee` to `baz` 😛



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {
+            long producerId = -1;
+            for (ProduceRequestData.TopicProduceData topicData : data.topicData()) {
+                for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
+                    BaseRecords baseRecords = partitionData.records();
+                    if (baseRecords instanceof Records) {
+                        Records records = (Records) baseRecords;
+                        for (RecordBatch batch : records.batches()) {
+                            if (producerId == -1 && batch.hasProducerId())

Review Comment:
   Is it an error if there's one batch with producer ID and another without one?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
           (entriesPerPartition, Map.empty)
         else
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)

Review Comment:
   Is it valid for `isTransactional` to be `true` but `hasProducerId` to be `false`?



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {

Review Comment:
   For the uninitiated, can we move and/or comment why version `3` is special?



##########
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##########
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
         // Works fine with current version (>= 7)
         ProduceRequest.forCurrentMagic(produceData);
     }
+    
+    @Test
+    public void testNoMixedProducerIds() {
+        final long producerId1 = 15L;
+        final long producerId2 = 16L;
+        final short producerEpoch = 5;
+        final int sequence = 10;
+
+        final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE,
+                new SimpleRecord("foo".getBytes()));
+        final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1,
+                producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
+        final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2,
+                producerEpoch, sequence, new SimpleRecord("bee".getBytes()));
+
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(
+                new ProduceRequestData()
+                        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+                                new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))),
+                                new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))),
+                                new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList(
+                                        new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(idempotentRecords))))
+                        .iterator()))
+                .setAcks((short) 1)
+                .setTimeoutMs(5000));
+        IntStream.range(3, ApiKeys.PRODUCE.latestVersion())

Review Comment:
   Another place where knowing why version three is special would be helpful (for me).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1181887737


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -744,6 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         // map not yet verified partitions to a request object
+        // verification only occurs on produce requests and those will always have one batch for versions that support transactions.
         val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()

Review Comment:
   Note: notYetVerifiedEntires should always be transactional :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1184131575


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -637,17 +637,31 @@ class ReplicaManager(val config: KafkaConfig,
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       
+      val transactionalProducerIds = mutable.HashSet[Long]()
       val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
         if (transactionStatePartition.isEmpty || !config.transactionPartitionVerificationEnable)
           (entriesPerPartition, Map.empty)
-        else
+        else {
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
+            transactionalBatches.map(_.producerId()).toSet.foreach(transactionalProducerIds.add(_))
+            if (transactionalBatches.nonEmpty) {
+              getPartitionOrException(topicPartition).hasOngoingTransaction(transactionalBatches.head.producerId)
+            } else { 
+              // If there is no producer ID in the batches, no need to verify.
+              true
+            }
           }
+        }
+      // We should have exactly one producer ID for transactional records
+      if (transactionalProducerIds.size > 1) {
+        throw new InvalidRecordException("Transactional records contained more than one producer ID")

Review Comment:
   Would it make sense to return `InvalidProducerIdMapping`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -637,17 +637,31 @@ class ReplicaManager(val config: KafkaConfig,
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       
+      val transactionalProducerIds = mutable.HashSet[Long]()
       val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
         if (transactionStatePartition.isEmpty || !config.transactionPartitionVerificationEnable)
           (entriesPerPartition, Map.empty)
-        else
+        else {
           entriesPerPartition.partition { case (topicPartition, records) =>
-            getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+            // Produce requests (only requests that require verification) should only have one batch per partition in "batches" but check all just to be safe.
+            val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional)
+            transactionalBatches.map(_.producerId()).toSet.foreach(transactionalProducerIds.add(_))

Review Comment:
   nit: is the `toSet` necessary? Maybe we can simplify:
   ```scala
   transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1172068360


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -565,6 +565,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
         return
       }
+      try {
+        ProduceRequest.validateProducerIds(request.header.apiVersion, produceRequest.data)
+      } catch {
+        case e: InvalidRecordException =>
+          requestHelper.sendErrorResponseMaybeThrottle(request, Errors.INVALID_RECORD.exception)

Review Comment:
   yes good point
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864129


##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {
+            long producerId = -1;
+            for (ProduceRequestData.TopicProduceData topicData : data.topicData()) {
+                for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
+                    BaseRecords baseRecords = partitionData.records();
+                    if (baseRecords instanceof Records) {
+                        Records records = (Records) baseRecords;
+                        for (RecordBatch batch : records.batches()) {
+                            if (producerId == -1 && batch.hasProducerId())

Review Comment:
   Based on some of the ProduceRequest tests I found, we allow for some batches with and some without. 
   See `testMixedIdempotentData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180865066


##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
         partitionSizes();
         data = null;
     }
+    
+    public static void validateProducerIds(short version, ProduceRequestData data) {
+        if (version >= 3) {

Review Comment:
   In producerequest.json:
   >   // Version 3 adds the transactional ID, which is used for authorization when attempting to write
     // transactional data.  Version 3 also adds support for Kafka Message Format v2.
     
    Basically before version 3, these fields aren't used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1181887737


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -744,6 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         // map not yet verified partitions to a request object
+        // verification only occurs on produce requests and those will always have one batch for versions that support transactions.
         val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()

Review Comment:
   Since produce requests only contain one batch and we put the partition in notYetVerified if it contains a transactional partition, this one must be transactional. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1513675548

   I already see the checkstyle issues created by my IDE so I will fix in a bit.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1183188070


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -744,6 +751,8 @@ class ReplicaManager(val config: KafkaConfig,
         }
 
         // map not yet verified partitions to a request object
+        // Since verification occurs on produce requests only, and each produce request has one batch per partition, we know the producer ID is transactional

Review Comment:
   I mention this flaw in the PR description, but as discussed offline, will close the gap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan merged PR #13607:
URL: https://github.com/apache/kafka/pull/13607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org