You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "hachikuji (via GitHub)" <gi...@apache.org> on 2023/05/08 17:14:17 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

hachikuji commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1187662801


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {

Review Comment:
   typo: Tentaitive



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeUpdateTentaitiveSequence(int sequence) {
+        if (batchMetadata.isEmpty() && (!this.tentativeSequence.isPresent() || this.tentativeSequence.getAsInt() > sequence))
+            this.tentativeSequence = OptionalInt.of(sequence);
+    }
 
     private void addBatchMetadata(BatchMetadata batch) {
+        // When appending a batch, we no longer need tentative sequence.
+        this.tentativeSequence = OptionalInt.empty();
         if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst();
         batchMetadata.add(batch);
     }
+    
+    public boolean compareAndSetVerificationState(short expectedProducerEpoch, VerificationState expectedVerificationState, VerificationState newVerificationState) {
+        if (expectedProducerEpoch == this.producerEpoch && verificationState == expectedVerificationState) {
+            this.verificationState = newVerificationState;
+            return true;
+        }
+        return false;
+    }
 
     public void update(ProducerStateEntry nextEntry) {
-        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset);
+        update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset, nextEntry.verificationState);
     }
 
     public void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp) {
-        update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty());
+        update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty(), VerificationState.EMPTY);
     }
 
     private void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp, Deque<BatchMetadata> batchMetadata,
-                        OptionalLong currentTxnFirstOffset) {
+                        OptionalLong currentTxnFirstOffset, VerificationState verificationState) {

Review Comment:
   nit: when arg lists go above 2 or 3, it's helpful to start putting each argument on a separate line:
   ```java
   private void update(
     short producerEpoch,
     int coordinatorEpoch,
     long lastTimestamp,
     ...
   ) {
   ````



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }
 
     public static ProducerStateEntry empty(long producerId) {
-        return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty());
+        return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty(), VerificationState.EMPTY, OptionalInt.empty());
+    }
+
+    public static ProducerStateEntry forVerification(long producerId, short producerEpoch, long milliseconds) {

Review Comment:
   nit: `milliseconds` -> `lastTimestamp`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) {
+        ProducerStateEntry entry;
+        if (producers.containsKey(producerId)) {

Review Comment:
   nit: usually we would call `get` and check for null (saves one hash lookup)



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Boolean = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence)
+    (!entry.currentTxnFirstOffset.isPresent) &&
+      (entry.compareAndSetVerificationState(producerEpoch, ProducerStateEntry.VerificationState.EMPTY, ProducerStateEntry.VerificationState.VERIFYING) ||
+        entry.verificationState() == ProducerStateEntry.VerificationState.VERIFYING)
+  }
+  
+  def compareAndSetVerificationState(producerId: Long,
+                                     producerEpoch: Short,
+                                     baseSequence: Int,
+                                     expectedVerificationState: ProducerStateEntry.VerificationState,
+                                     newVerificationState: ProducerStateEntry.VerificationState): Unit = { lock synchronized {

Review Comment:
   nit: can drop braces around `lock synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org