You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jolshan (via GitHub)" <gi...@apache.org> on 2023/04/18 20:40:21 UTC

[GitHub] [kafka] jolshan opened a new pull request, #13608: KAFKA-14844: Include check transaction is still ongoing right before append

jolshan opened a new pull request, #13608:
URL: https://github.com/apache/kafka/pull/13608

   We will need to pick up the changes in KAFKA-14916 (right now we assume producer ID is shared in all batches), but wanted to get a WIP draft out for general ideas.
   
   Will need to add the commented section in analyzeAndValidateProducerState and tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197145142


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   As discussed offline, this race can cause hanging transactions:
   1. producer properly adds partition to transaction
   2. producer has trouble sending messages, retries a few times (all messages have the same epoch + sequence)
   3. partition leader gets first message, sets state to VERIFYING, issues request to TC
   4. TC sees the open transaction, replies with success
   5. producer gives up and aborts transaction
   6. partition leader gets abort marker, sets state to EMPTY
   7. partition leader gets second message, sets state to VERIFYING, issues request to TC
   8. successful reply from step 4 finally arrives, sees the state is VERIFYING, sets to VERIFIED
   9. the first message is written on top of abort marker, resulting in "hanging" transaction
   
   the issue is that transition VERIFYING - EMPTY - VERIFYING is the same as VERIFYING, the classic ABA problem https://en.wikipedia.org/wiki/ABA_problem



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   We only need tentative state when we don't have an entry for something that we've written.  I'm not sure if doing something that we don't need so that we don't rely on it later makes the code simpler :-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187662801


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {

Review Comment:
   typo: Tentaitive



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {
+        if (batchMetadata.isEmpty() && (!this.tentativeSequence.isPresent() || this.tentativeSequence.getAsInt() > sequence))
+            this.tentativeSequence = OptionalInt.of(sequence);
+    }
 
     private void addBatchMetadata(BatchMetadata batch) {
+        // When appending a batch, we no longer need tentative sequence.
+        this.tentativeSequence = OptionalInt.empty();
         if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst();
         batchMetadata.add(batch);
     }
+    
+    public boolean compareAndSetVerificationState(short expectedProducerEpoch, VerificationState expectedVerificationState, VerificationState newVerificationState) {
+        if (expectedProducerEpoch == this.producerEpoch && verificationState == expectedVerificationState) {
+            this.verificationState = newVerificationState;
+            return true;
+        }
+        return false;
+    }
 
     public void update(ProducerStateEntry nextEntry) {
-        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset);
+        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset, nextEntry.verificationState);
     }
 
     public void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp) {
-        update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty());
+        update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty(), VerificationState.EMPTY);
     }
 
     private void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp, Deque<BatchMetadata> batchMetadata,
-                        OptionalLong currentTxnFirstOffset) {
+                        OptionalLong currentTxnFirstOffset, VerificationState verificationState) {

Review Comment:
   nit: when arg lists go above 2 or 3, it's helpful to start putting each argument on a separate line:
   ```java
   private void update(
     short producerEpoch,
     int coordinatorEpoch,
     long lastTimestamp,
     ...
   ) {
   ````



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }
 
     public static ProducerStateEntry empty(long producerId) {
-        return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
+        return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty(), VerificationState.EMPTY, OptionalInt.empty());
+    }
+
+    public static ProducerStateEntry forVerification(long producerId, short producerEpoch, long milliseconds) {

Review Comment:
   nit: `milliseconds` -> `lastTimestamp`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {
+        ProducerStateEntry entry;
+        if (producers.containsKey(producerId)) {

Review Comment:
   nit: usually we would call `get` and check for null (saves one hash lookup)



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Boolean = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence)
+    (!entry.currentTxnFirstOffset.isPresent) &&
+      (entry.compareAndSetVerificationState(producerEpoch, ProducerStateEntry.VerificationState.EMPTY, ProducerStateEntry.VerificationState.VERIFYING) ||
+        entry.verificationState() == ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+                                     producerEpoch: Short,
+                                     baseSequence: Int,
+                                     expectedVerificationState: ProducerStateEntry.VerificationState,
+                                     newVerificationState: ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   nit: can drop braces around `lock synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187719600


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Boolean = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence)
+    (!entry.currentTxnFirstOffset.isPresent) &&
+      (entry.compareAndSetVerificationState(producerEpoch, ProducerStateEntry.VerificationState.EMPTY, ProducerStateEntry.VerificationState.VERIFYING) ||
+        entry.verificationState() == ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+                                     producerEpoch: Short,
+                                     baseSequence: Int,
+                                     expectedVerificationState: ProducerStateEntry.VerificationState,
+                                     newVerificationState: ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   That's the method braces. Should I just make a new line so it is clearer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205982602


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   I've addressed this issue by creating a verification object that is created on first attempt to verify and removed when a marker is written. 
   
   When verification is needed, we pass this object through and check under log lock in the append path.
   In the steps above, 6 will clear this object, 7 will set a new one, so 8 will not succeed and error out. 
   
   When verification is not needed (already verified), we rely on firstTxnOffset being present before appending to the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207330097


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. Add the Node if it isn't there.
-      val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+      val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-      // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
-          }
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception 
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {
+            val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch())
+              Errors.INVALID_PRODUCER_EPOCH
+            else
+              Errors.NETWORK_EXCEPTION
+          val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback)
+

Review Comment:
   yes - we didn't really cover these epoch cases before, and I thought it would be good to include them for completeness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197989841


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   There is also not currently a great way to check if this is undergoing verification/is the first entry placed in the cache without any batch data yet. Do you have a suggestion for what I could do there?
   
   We would somehow need to know that the epoch was bumped vs just a new entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1570723482

   Going to close this in favor of breaking up the PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1185544285


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3273,17 +3274,46 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L,
       producerId = producerId)
     partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertFalse(partition.hasOngoingTransaction(producerId))
+    assertEquals(OptionalLong.empty(), txnFirstOffset(producerId))

Review Comment:
   will fix duplicate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1206199912


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,10 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Optional[VerificationState] = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence)
+    if (entry.currentTxnFirstOffset.isPresent) {

Review Comment:
   Maybe add a comment why we don't need verification if currentTxnFirstOffset.isPresent.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeAddVerificationState() {
+        // If we already have a verification state, we can reuse it. This is because we know this is the same transaction 
+        // as the state is cleared upon writing a control marker.
+        if (!this.verificationState.isPresent())
+            this.verificationState = Optional.of(new VerificationState());

Review Comment:
   This creates an object of class Optional that points to an object of class Verification state, so we get an extra object for every producer entry.  We just need a plain value of an Object to avoid extra overhead.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   Do we ever go through verification logic for followers?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state

Review Comment:
   Maybe add more comments on how we validate transaction state.  Also maybe not here but at least somewhere we should have a detailed comment about the race condition we're addressing and specifically how the verificationState solves the ABA problem.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationState.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class serves as a unique object to ensure the same transaction is being verified.
+ * When verification starts, this object is created and checked before append to ensure the producer state entry
+ * is not modified (via ending the transaction) before the record is appended.
+ */
+public class VerificationState {

Review Comment:
   Do we need a separate class for that?  I think we could just use Object, because we just compare references, not values.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. Add the Node if it isn't there.
-      val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+      val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-      // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
-          }
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception 
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {
+            val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch())
+              Errors.INVALID_PRODUCER_EPOCH
+            else
+              Errors.NETWORK_EXCEPTION
+          val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback)
+

Review Comment:
   Just to clarify -- this change isn't about "transient" state or handling race with abort, but just an unrelated issue?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeAddVerificationState() {
+        // If we already have a verification state, we can reuse it. This is because we know this is the same transaction 
+        // as the state is cleared upon writing a control marker.
+        if (!this.verificationState.isPresent())
+            this.verificationState = Optional.of(new VerificationState());
+    }
+
+    // We only set tentative sequence if this entry has never had any batch metadata written to it. (We do not need to store if we've just bumped epoch.)
+    // It is used to avoid OutOfOrderSequenceExceptions when we saw a lower sequence during transaction verification.

Review Comment:
   Maybe elaborate on the problem we're trying to address (sequence of steps that leads to the problem).



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Optional[VerificationState] = Optional.empty()

Review Comment:
   I think this should be per partition -- each log has its own producer state and its own verificationState.  Do we have a unit test that tries to append to multiple partitions?  I would expect it to fail, because each producer state would have its own verificationState object, but we pass only whoever happened to be last and so for the others it would look like a race.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")
+            } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   I think verification check belongs here, and sequencing would be better done in ProducerAppendInfo.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (!hasOngoingTransaction(batch.producerId) && (verificationStateOpt != verificationState(batch.producerId) || !verificationStateOpt.isPresent)) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")

Review Comment:
   Looks like we already have the producer state in the maybeLastEntry, so we don't need to do additional lookups and locks and just check the state of maybeLastEntry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
    if (producerEpoch != updatedEntry.producerEpoch()) {
               if (appendFirstSeq != 0) {
                   if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) {
                       throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId +
                               "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), "
                               + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)");
                               ```
                               
   Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it.                            



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
    if (producerEpoch != updatedEntry.producerEpoch()) {
               if (appendFirstSeq != 0) {
                   if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) {
                       throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId +
                               "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), "
                               + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)");
    ```
                               
   Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it.                            



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199479050


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {

Review Comment:
   Actually hmmm, i guess we would also need to update when we are in verifying state as well. I'll think on this a bit more but it may be possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug in further and interestingly we do a different check on bumped epochs after markers. We do have a second check here:
   ```
               if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || currentEntry.tentativeSequence().isPresent())) {
                   throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
                           "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
                           " (incoming seq. number), " + currentLastSeq + " (current end sequence number)");
               }
   ```
   
   I've added the tentative sequence change because this PR actually changes this behavior on first verification. We put the producer epoch in the entry, so that first case no longer applies.            



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197988031


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;

Review Comment:
   currently no. I was modeling it off the above method, but I can change to void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1538787228

   Thanks @hachikuji! Working on the tests as well, so I will push those when they are ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")
+            } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   One thing that is tricky is that the producer state entry used in the sequence check is actually not the 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199477543


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {

Review Comment:
   I guess to address the previous concern about tentative sequences on epoch bumps, maybe we can just update the sequence when no entry exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan closed pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan closed pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append 
URL: https://github.com/apache/kafka/pull/13608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197984489


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   It does because we don't currently check for sequence 0 on new epoch as far as I can tell. So we would have the same problem unless we implement something new there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1194600838


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {

Review Comment:
   "Tentative" state is generally useful for both idempotent and transactional producers independently of verification functionality - as soon as we've got batch1 it's good to remember that we've got it, so that if we fail down the road with a retriable error, batch2 won't get in before we have  a chance to retry batch1.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;

Review Comment:
   Do we use a return value?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I don't think this is the right condition -- we only need a tentative entry when we literally have no entry (i.e. we don't know in which state the producer was when producer id got expired), not just the empty batch metadata (which can happen on the epoch bump).  If we got an epoch bump we know that the next sequence is 0 and we can reject messages that have a gap.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   Using a finite state machine for handling race conditions is susceptible to ABA problem, i.e. the logic may not distinguish between the original state not changing concurrently and changing back to the same state.  In this case it looks like the following sequence is possible:
   1. Got message, started verifying transaction (transitioned to VERIFYING)
   2. Transaction is in ongoing, so replied with success.
   3. Transaction got aborted (transitioned to EMPTY).
   4. Got another message, started verifying transaction (transitioned to VERIFYING)
   5. The first message verification successfully completes, sees that state is VERIFYING and allows message to get written.
   
   This is why we (and pretty much any distributed system) use ever-incrementing states (e.g. epochs) to handle concurrency, so that we don't get into ABA problem.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")
+            } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   I think we do sequencing checks in ProducerAppendInfo, should we move this there as well so that all checks are in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   We don't do verification on non-client origin requests 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   this falls under the known gap for part 1. we can't prevent old batches from joining new transactions, but we can prevent hanging ones
   
   
   
   
   
   [4:47](https://confluent.slack.com/archives/C055ADL2X9P/p1684280850696749)
   once we get epoch bumps this will work correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195788236


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   this falls under the known gap for part 1. we can't prevent old batches from joining new transactions, but we can prevent hanging ones
   
   once we get epoch bumps this will work correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1195786198


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I don't think we rely on this now. I also don't think this is currently incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")
+            } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   Did we want to move the verification check there? Or just tentative sequence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1553256614

   @[vamossagar12](https://github.com/vamossagar12) 
   The link might have been a mistake -- I may have previously used the wrong jira number in the title and caused confusion. That ticket is kafka-14844 and this is kafka-14884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1171868072


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1051,6 +1060,8 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         try {
           val partition = getPartitionOrException(topicPartition)
+          val producerId = records.firstBatch().producerId()
+          partition.compareAndSetVerificationState(producerId, ProducerStateEntry.VerificationState.VERIFYING, ProducerStateEntry.VerificationState.VERIFIED)

Review Comment:
   this will be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207333185


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Optional[VerificationState] = Optional.empty()

Review Comment:
   I don't think we would get a new one. We only need one per transaction right? 
   So either we succeed and no longer need to worry about the verification, or the partition is retried and get the same verification object from the first time. 
   
   We do have a test with multiple partitions but I would have to check if it also checks the verification state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207484112


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   I suppose I should confirm we won't have an issue when the verification state is blank -- I will make sure of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. Add the Node if it isn't there.
-      val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+      val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-      // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error)
-          }
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception 
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {
+            val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch())
+              Errors.INVALID_PRODUCER_EPOCH
+            else
+              Errors.NETWORK_EXCEPTION
+          val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback)
+

Review Comment:
   I'll separate this out into a new PR as I'm already splitting this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1553261622

   @jolshan , no problem at all! Thanks for confirming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1199478295


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I guess we can simply call this on the case where the entry did not yet exist. See the comment above. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1546652807

   hey @jolshan , looks like this PR got tagged in a Connect related JIRa https://issues.apache.org/jira/browse/KAFKA-14844. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13608:
URL: https://github.com/apache/kafka/pull/13608#issuecomment-1544627294

   I forgot this also covers the race for end txn marker being written before the produce completes.
   (We reset verification status to empty on the marker, so a produce request will fail)
   
   Will add a test for this and include in description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197986988


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {

Review Comment:
   I think we agreed offline that there were some pitfalls in applying this change to non-verification cases, so we would hold off until later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   I guess there is an argument that we could have a marker come in before a fetch response. 
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207324170


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   We currently don't. 
   ~I guess there is an argument that we could have a marker come in before a fetch response.~ 
   EDIT: if we don't replicate in order we have a problem, so I don't think we need to cover this.
   
   
   I'm not sure how this would be implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org