You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "artemlivshits (via GitHub)" <gi...@apache.org> on 2023/05/16 22:59:32 UTC

[GitHub] [kafka] artemlivshits commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1194600838


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {

Review Comment:
   "Tentative" state is generally useful for both idempotent and transactional producers independently of verification functionality - as soon as we've got batch1 it's good to remember that we've got it, so that if we fail down the road with a retriable error, batch2 won't get in before we have  a chance to retry batch1.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;

Review Comment:
   Do we use a return value?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen sequence
+    public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I don't think this is the right condition -- we only need a tentative entry when we literally have no entry (i.e. we don't know in which state the producer was when producer id got expired), not just the empty batch metadata (which can happen on the epoch bump).  If we got an epoch bump we know that the next sequence is 0 and we can reject messages that have a gap.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   Using a finite state machine for handling race conditions is susceptible to ABA problem, i.e. the logic may not distinguish between the original state not changing concurrently and changing back to the same state.  In this case it looks like the following sequence is possible:
   1. Got message, started verifying transaction (transitioned to VERIFYING)
   2. Transaction is in ongoing, so replied with success.
   3. Transaction got aborted (transitioned to EMPTY).
   4. Got another message, started verifying transaction (transitioned to VERIFYING)
   5. The first message verification successfully completes, sees that state is VERIFYING and allows message to get written.
   
   This is why we (and pretty much any distributed system) use ever-incrementing states (e.g. epochs) to handle concurrency, so that we don't get into ABA problem.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher sequence than one previously seen through verification.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an ongoing transaction")
+            } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   I think we do sequencing checks in ProducerAppendInfo, should we move this there as well so that all checks are in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org