You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jolshan (via GitHub)" <gi...@apache.org> on 2023/05/31 18:27:24 UTC

[GitHub] [kafka] jolshan opened a new pull request, #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

jolshan opened a new pull request, #13787:
URL: https://github.com/apache/kafka/pull/13787

   This is a redo of https://github.com/apache/kafka/pull/13608
   
   Introduced extra mapping to track verification state.
   
   When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction. 
   
   We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.
   
   We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1255778900


##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -118,6 +118,11 @@ protected void pollOnce(long maxTimeoutMs) {
                 // DisconnectException is expected when NetworkClient#initiateClose is called
                 return;
             }
+

Review Comment:
   nit: Should we remove this empty line as we have no others in this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1224505468


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3273,17 +3272,35 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L,
       producerId = producerId)
     partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertFalse(partition.hasOngoingTransaction(producerId))
 
-    val transactionRecords = createTransactionalRecords(List(
+    def transactionRecords() = createTransactionalRecords(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L,
       baseSequence = 3,
       producerId = producerId)
-    partition.appendRecordsToLeader(transactionRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertTrue(partition.hasOngoingTransaction(producerId))
+
+    // When verification guard is not there, we should not be able to append.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
+
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
+    val verificationGuard = partition.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)

Review Comment:
   Sorry I missed PartitionTest 🤦‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221998443


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {

Review Comment:
   Yeah, keeping it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+            if (!hasOngoingTransaction(batch.producerId) && (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null))

Review Comment:
   This is a hold over from the previous pr that also included tentative state. Since we may move that, I can make this all one. (But I may make it a helper since this is quite a lot of logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {

Review Comment:
   This is also used in unifiedLog when we create the object and when we do the final check. But I can make it package private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561645


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;

Review Comment:
   We can remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan merged pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan merged PR #13787:
URL: https://github.com/apache/kafka/pull/13787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1247245146


##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -507,9 +507,14 @@ class RequestChannel(val queueSize: Int,
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 
   def sendCallbackRequest(request: CallbackRequest): Unit = {
-    callbackQueue.put(request)
-    if (!requestQueue.offer(RequestChannel.WakeupRequest))
-      trace("Wakeup request could not be added to queue. This means queue is full, so we will still process callback.")
+    try {
+      callbackQueue.put(request)
+      if (!requestQueue.offer(RequestChannel.WakeupRequest))
+        trace("Wakeup request could not be added to queue. This means queue is full, so we will still process callback.")
+    } catch {
+      case e: Throwable =>
+        warn("There was an exception when attempting to send the callback request", e)

Review Comment:
   Hmmm -- I guess we would not send a response. Do we send a response normally though if we shutdown during handling a request? Or do we just rely on it to time out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1255783931


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   Should we add a test to avoid regressing in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1256502612


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   I am a bit concerned with changing the flow here though. At the very least I suspect that some produce requests will be slower since we will have to retry in some cases when we elected. Maybe this isn't a big concern, but I still think it is a larger change.
   
   As for the log not existing -- would we elect a log as leader and send a request to it before the log exists? 
   
   @hachikuji what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1258915597


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   Returning an error seems straightforward. No harm done if the broker comes leader just after since the client will retry. Given how hard the scenario was to reproduce, I'm not too concerned about impact from retries.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   Returning an error seems straightforward. No harm done if the broker becomes leader just after since the client will retry. Given how hard the scenario was to reproduce, I'm not too concerned about impact from retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1361468366


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   And having `null` as the default makes it even more unsafe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221986657


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {

Review Comment:
   Hmmm...
   ```
   /Users/justineolshan/kafka/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2792:56: method getOrMaybeCreateVerificationGuard in class UnifiedLog cannot be accessed as a member of kafka.log.UnifiedLog from class ReplicaManagerTest in package server
   ```
   
   I can potentially refactor this to not check verification guard in these tests, but I do think it is useful. What do you think about leaving as not package private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221909838


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {

Review Comment:
   This is also used in unifiedLog when we do the final check. But I can make it package private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1220078561


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = {
-    leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId))
+  // Returns a verificationGuard object if we need to verify. This starts or continues the verification process. Otherwise return null.

Review Comment:
   I can swap all these to not use this term. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1247229531


##########
core/src/main/scala/kafka/network/RequestChannel.scala:
##########
@@ -507,9 +507,14 @@ class RequestChannel(val queueSize: Int,
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
 
   def sendCallbackRequest(request: CallbackRequest): Unit = {
-    callbackQueue.put(request)
-    if (!requestQueue.offer(RequestChannel.WakeupRequest))
-      trace("Wakeup request could not be added to queue. This means queue is full, so we will still process callback.")
+    try {
+      callbackQueue.put(request)
+      if (!requestQueue.offer(RequestChannel.WakeupRequest))
+        trace("Wakeup request could not be added to queue. This means queue is full, so we will still process callback.")
+    } catch {
+      case e: Throwable =>
+        warn("There was an exception when attempting to send the callback request", e)

Review Comment:
   What happens to the request inside the `CallbackRequest` object? Do we need to send a response?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214639262


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Object = null

Review Comment:
   I've fixed this and added a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221911516


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationFlow(): Unit = {

Review Comment:
   I'm too into transaction world that I forget not everyone is there. 😵‍💫



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);

Review Comment:
   If we don't sleep, then the future does not complete. There is another test in the file that explains this only expires the batch if it hits the delivery timeout
   
   ```
           // We add 600 millis to expire the first batch but not the second.
           // Note deliveryTimeoutMs is 1500.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362920887


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   > I don't think there is an expectation that the string literals would be different objects, this code allocates new objects to get unique identity.
   
   Right, but the lack of strong typing doesn't prevent the wrong instance from being passed. That's why we should not use `Object` like that. In practice, it results in avoidable bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261444204


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);
+        client.disconnect(node.idString(), true);
+        client.backoff(node, 10);
+
+        sender.runOnce(); // now expire the batch.
+        assertFutureFailure(request1, TimeoutException.class);
+
+        time.sleep(20);
+
+        sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
+        sender.runOnce(); // receive late response
+
+        // Loop once and confirm that the transaction manager does not enter a fatal error state
+        sender.runOnce();
+        assertTrue(txnManager.hasAbortableError());
+        TransactionalRequestResult result = txnManager.beginAbort();
+        sender.runOnce();
+
+        respondToEndTxn(Errors.NONE);
+        sender.runOnce();
+        assertTrue(txnManager::isInitializing);

Review Comment:
   I'm following conventions from the rest of the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1218665197


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -184,6 +186,27 @@ private void clearProducerIds() {
         producerIdCount = 0;
     }
 
+    /**
+     * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null.
+     */
+    public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) {
+        return verificationStates.computeIfAbsent(producerId, pid -> {
+            if (createIfAbsent)
+                return new VerificationStateEntry(pid, time.milliseconds());
+            else {
+                log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null");

Review Comment:
   Good point -- I can remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1219799829


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verificationGuard object for the given producer ID if the transaction is not yet ongoing.

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verificationGuard, sending a verification request to the transaction coordinator, and

Review Comment:
   nit `verification guard` for consistency. The are a few other cases in this comment.



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,46 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry
+    append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+    stateManager.clearVerificationStateEntry(producerId)
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))
+

Review Comment:
   nit: Empty line could be removed.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -575,8 +575,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def hasOngoingTransaction(producerId: Long): Boolean = {
-    leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId))
+  // Returns a verificationGuard object if we need to verify. This starts or continues the verification process. Otherwise return null.

Review Comment:
   nit: `verification guard object`? I find `verificationGuard` confusing because we actually return an `Object`.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1005,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verificationGuard, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verificationGuard, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not st a verification guard. If the transaction is aborted, hasOngoingTransaction is false and

Review Comment:
   nit: s/st/set



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,46 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry

Review Comment:
   nit: `.`



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verificationGuard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {
+    val entry = producerStateManager.verificationStateEntry(producerId, createIfAbsent)
+    if (entry != null) entry.verificationGuard else null
+  }
+
+  /**
+   * Return true if the given producer ID has a transaction ongoing.
+   */
   def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
-    val entry = producerStateManager.activeProducers.get(producerId)
-    entry != null && entry.currentTxnFirstOffset.isPresent
+    producerStateManager.activeProducers.getOrDefault(producerId, ProducerStateEntry.empty(producerId)).currentTxnFirstOffset.isPresent

Review Comment:
   nit: The previous version seems better to me as we don't have to allocate `ProducerStateEntry.empty(producerId)` all the time.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer id.
+ * It contains a verificationGuard object that is used to uniquely identify the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
+ * We remove the verificationGuard whenever we write data to the transaction or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private long timestamp;
+    private Object verificationGuard;

Review Comment:
   nit: Those two could be `final`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362733029


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   Ok, maybe it was confusing to refer to object identity as its address, let me make a more precise statement: 2 independently allocated objects cannot reside at the same address at the same point in time, so `new Object != new Object` is true at any given point in time, no matter what GC does, and that's the property that's used in this change.
   
   I don't think there is an expectation that the string literals would be different objects, this code allocates new objects to get unique identity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1361539171


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   We don't use object content, we just use object identity (i.e. its address), so passing anything is perfectly fine - 2 different objects would have 2 different addresses.  `null` works because we know that it is not equal to any real object identity.
   
   The scenario that we're solving is this: we need to check some condition under lock, then we need to release the lock and do a bunch of work (that includes sending RPCs and etc.), then once the work is done, we need to make sure that the condition still holds.  So to implement the solution, we create an object and remember a reference to it.  If the condition is changed, the reference is set to `null`.  So when we come back, we check if the reference is still set to the object that we remember, if it's not then we've got raced and the condition isn't valid anymore.  Naively, it may seem that we can use just a Boolean flag, and set it to `true` when we start validation, and if the condition is changed, we set the flag to `false`.  This approach, however, is prone to ABA problem.
   
   There are some details here: https://github.com/apache/kafka/pull/13787#discussion_r1213675427



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1223961441


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3273,17 +3272,35 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L,
       producerId = producerId)
     partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertFalse(partition.hasOngoingTransaction(producerId))
 
-    val transactionRecords = createTransactionalRecords(List(
+    def transactionRecords() = createTransactionalRecords(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L,
       baseSequence = 3,
       producerId = producerId)
-    partition.appendRecordsToLeader(transactionRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertTrue(partition.hasOngoingTransaction(producerId))
+
+    // When verification guard is not there, we should not be able to append.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
+
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
+    val verificationGuard = partition.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    // With the wrong verification guard, append should fail.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(),
+      origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))
+
+    // We should return the same verification object when we still need to verify. Append should proceed.
+    val verificationGuard2 = partition.maybeStartTransactionVerification(producerId)
+    assertEquals(verificationGuard, verificationGuard2)
+    partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)
+
+    // We should no longer need a verification object. Future appends without verification guard will also succeed.
+    val verificationGuard3 = partition.maybeStartTransactionVerification(producerId)
+    assertEquals(null, verificationGuard3)

Review Comment:
   nit: assetNull



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,45 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry.
+    append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+    stateManager.clearVerificationStateEntry(producerId)
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))

Review Comment:
   nit: assertNull



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNull(getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId))

Review Comment:
   nit: assertNotNull



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,45 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry.
+    append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+    stateManager.clearVerificationStateEntry(producerId)
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))
+  }
+
+  @Test
+  def testVerificationStateEntryExpiration(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+
+    // Before timeout we do not remove. Note: Accessing the verification entry does not update the time.
+    time.sleep(producerStateManagerConfig.producerIdExpirationMs / 2)
+    stateManager.removeExpiredProducers(time.milliseconds())
+    assertEquals(originalEntry, stateManager.verificationStateEntry(producerId, false))
+
+    time.sleep((producerStateManagerConfig.producerIdExpirationMs / 2) + 1)
+    stateManager.removeExpiredProducers(time.milliseconds())
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))

Review Comment:
   nit: assertNull



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3273,17 +3272,35 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L,
       producerId = producerId)
     partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertFalse(partition.hasOngoingTransaction(producerId))
 
-    val transactionRecords = createTransactionalRecords(List(
+    def transactionRecords() = createTransactionalRecords(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L,
       baseSequence = 3,
       producerId = producerId)
-    partition.appendRecordsToLeader(transactionRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertTrue(partition.hasOngoingTransaction(producerId))
+
+    // When verification guard is not there, we should not be able to append.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
+
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
+    val verificationGuard = partition.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)

Review Comment:
   nit: assertNotNull



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1255784243


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {
       case Some(leaderLog) => leaderLog.maybeStartTransactionVerification(producerId)

Review Comment:
   nit: `leaderLog` -> `log`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214559856


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = null,

Review Comment:
   Nope. We don't verify followers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1213645281


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,9 +579,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification state for the given producer ID if the transaction is not ongoing. Otherwise return null.
+   */
+  def transactionNeedsVerifying(producerId: Long): Object = lock synchronized {

Review Comment:
   The name implies that it's a function with no side effects, but it actually starts verification by installing some state.  Maybe we should use "maybeStartTransactionVerification" or something like that.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -184,6 +186,27 @@ private void clearProducerIds() {
         producerIdCount = 0;
     }
 
+    /**
+     * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null.
+     */
+    public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) {
+        return verificationStates.computeIfAbsent(producerId, pid -> {
+            if (createIfAbsent)
+                return new VerificationStateEntry(pid, time.milliseconds());
+            else {
+                log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null");

Review Comment:
   Isn't it expected that we find no entry if we got a race condition with a concurrent abort?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1073,7 +1076,8 @@ class ReplicaManager(val config: KafkaConfig,
                                origin: AppendOrigin,
                                entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                requiredAcks: Short,
-                               requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = {
+                               requestLocal: RequestLocal,
+                               verificationState: Object): Map[TopicPartition, LogAppendResult] = {

Review Comment:
   This should be per partition.  Either this should be a Map[TopicPartition, Object] or it should be added to the entriesPerPartition map.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;
+    private long timestamp;
+    private Object verificationState;

Review Comment:
   The purpose of this to protect atomicity of verification operation from concurrent modifications (sort of 'optimistic locking'), maybe verificationSentinel or verificationGuard or verificationTripwire would be a better name?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1004,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.

Review Comment:
   I think we need to add some detailed comments about this verification, there were so many discussions that lead to these 3 lines of code it's hard to reverse engineer all the reasons behind this logic without comments.  I would write something along the following lines (really big and detailed comment):
   
   The purpose of this code is to make sure we never add a new transactional message for a transaction that is not open in the transaction manager (a.k.a. "hanging transaction").  The main scenario is addressed by sending a verification RPC to the transaction coordinator (TC) that verifies that the transaction is ongoing for this partitions, but there are still some race conditions that need to be handled under the log's lock so that validation is atomic with append:
   1. Producer adds partition to transaction (talks to TC).
   2. Producer sends message to partition leader (this code path).
   3. If this is a new transaction, we create a verficationState object, that serves as a sentinel.
   4. We send verification RPC to TC.
   5. TC verifies that the transaction is ongoing and sends successful reply.
   6. Producer aborts the transaction.
   7. TC sends abort marker to partition leaders.
   8. Abort marker gets written to the log, resets verificationState object.
   9. We try to write the producer message to the log, and we  can see that the verificationState object is reset, so we error out.
   
   If we didn't have the verificationState object validation, then we could write a message at step 9 after the abort marker got written at step 8 creating a "hanging" transaction.
   
   The reason to have a verificationState object and not just isBeingVerified boolean is the following:
   1. Producer adds partition to transaction (talks to TC).
   2. Producer sends message to partition leader (this code path).
   3. Producer retries the same message to partition leader (message arrives on a new connection).
   4. First message arrives, transaction is not started, create verficationState.
   5. We send verification RPC to TC.
   6. TC verifies that the transaction is ongoing and sends successful reply.
   7. Producer aborts the transaction.
   8. TC sends abort marker to partition leaders.
   9. Abort marker gets written to the log, resets verificationState object.
   10. Second produce request (retry) arrives, transaction is not started, create verification
   11. We try to write the first producer message to the log, and we  can see that the verificationState object is different, so we error out.
   
   If we just used a Boolean flag, we could think at the step 11 that there was no state transition, while in fact it was a transition back to the same state (the ABA problem).
   
   If a transaction is already started for this partition (i.e. some messages got written), we don't send RPC to TC, but we need to re-validate that the transaction is still started under the log's lock before insertion, so if we didn't run the validation (i.e. requestVerificationState is null), then the transaction must be still running.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = null,

Review Comment:
   Do we have a case when we need to use verificationState on a follower?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;

Review Comment:
   Do we need this?  I think we already have this info in the map key.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Object = null

Review Comment:
   This should be per partition, here we're just taking a random verificationState that happened to be in the whatever partition got iterated last.  Do we have a unit test that adds batches to multiple partitions in the same transaction?  I would expect it to fail, because verificationState objects of different partitions wouldn't match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362327239


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   I suggested creating a type, but Artem argued that it was smaller and easier to just use Object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214664368


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+    private final long producerId;
+    private long timestamp;
+    private Object verificationState;

Review Comment:
   I've changed to verificationGuard



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1214561227


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -184,6 +186,27 @@ private void clearProducerIds() {
         producerIdCount = 0;
     }
 
+    /**
+     * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null.
+     */
+    public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) {
+        return verificationStates.computeIfAbsent(producerId, pid -> {
+            if (createIfAbsent)
+                return new VerificationStateEntry(pid, time.milliseconds());
+            else {
+                log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null");

Review Comment:
   Yes. But in this case, we will fail the verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362352735


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   @artemlivshits Note that this is Java, so you actually cannot pass an address (many GC implementations move objects and update their addresses).
   
   > so passing anything is perfectly fine - 2 different objects would have 2 different addresses. null works because we know that it is not equal to any real object identity.
   
   Actually this is not true, if you pass "foo" this is an interned string (just as one example).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1361468169


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   Hmm, why are we passing `Object` here and various other places? This is generally an anti-pattern since _anything_ can be passed even though it may not be valid. You should create a type for the verification guard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261443466


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);
+        client.disconnect(node.idString(), true);
+        client.backoff(node, 10);
+
+        sender.runOnce(); // now expire the batch.

Review Comment:
   I think it is both? This wording is the same as the other tests in the file that expire batches. (There are 4 others in the file). If i remove one or the other the test doesn't work correctly.
   
   Without the disconnect, we are not really able to receive two responses since I modified the disconnect code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1255775822


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -793,19 +793,18 @@ private void failBatch(
         Function<Integer, RuntimeException> recordExceptions,
         boolean adjustSequenceNumbers
     ) {
-        if (transactionManager != null) {
-            try {
-                // This call can throw an exception in the rare case that there's an invalid state transition
-                // attempted. Catch these so as not to interfere with the rest of the logic.
-                transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers);
-            } catch (Exception e) {
-                log.debug("Encountered error when handling a failed batch", e);
-            }
-        }
-
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
 
         if (batch.completeExceptionally(topLevelException, recordExceptions)) {
+            if (transactionManager != null) {

Review Comment:
   Do we need a unit test to cover this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1255779604


##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -118,6 +118,11 @@ protected void pollOnce(long maxTimeoutMs) {
                 // DisconnectException is expected when NetworkClient#initiateClose is called
                 return;
             }
+
+            if (t instanceof InterruptedException && !isRunning()) {

Review Comment:
   Should we add a unit test for this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1256141517


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   This was found due to a specific race condition where leadership was elected in the middle of handling the produce request. I can try to recreate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1256175697


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -578,7 +578,7 @@ class Partition(val topicPartition: TopicPartition,
 
   // Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
   def maybeStartTransactionVerification(producerId: Long): Object = {
-    leaderLogIfLocal match {
+    log match {

Review Comment:
   What are the cases when log is none?  Could we get into a situation when the log is none at the time of this call and we decide to skip verification, but by the time we actually do produce the log is set up?
   
   I think it would be more robust to require leadership at this point and return an error if it's not a leader yet.  We don't expect to successfully produce to a non-leader, so the error seams reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1218658582


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -184,6 +186,27 @@ private void clearProducerIds() {
         producerIdCount = 0;
     }
 
+    /**
+     * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null.
+     */
+    public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) {
+        return verificationStates.computeIfAbsent(producerId, pid -> {
+            if (createIfAbsent)
+                return new VerificationStateEntry(pid, time.milliseconds());
+            else {
+                log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null");

Review Comment:
   Do we need to warn for cases that are expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1223292555


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          if (!hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))

Review Comment:
   I think it is safe since we are just checking the producer state entry (and if it is missing, we will return false)
   But I can place the explicit is transactional check first if it seems clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221910510


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+            if (!hasOngoingTransaction(batch.producerId) && (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null))

Review Comment:
   This is a hold over from the previous pr that also included tentative state. Since we may move that, I can make this all one. (But I may make it a helper since this is quite a lot of logic.) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1221060938


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,6 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  /**
+   * Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
+   * Creation starts the verification process. Otherwise return null.
+   */
+  def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
+    if (hasOngoingTransaction(producerId))
+      null
+    else
+      verificationGuard(producerId, true)
+  }
+
+  /**
+   * Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
+   */
+  def verificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {

Review Comment:
   nit: Should we call this `getOrMaybeCreateVerificationGuard`? Could this one be package private? It seems that we only access it externally from tests.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationFlow(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
       // Append some transactional records.
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
-        new SimpleRecord(s"message $sequence".getBytes))
-      val result = appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
-      
+        new SimpleRecord("message".getBytes))
+
       val transactionToAdd = new AddPartitionsToTxnTransaction()
         .setTransactionalId(transactionalId)
         .setProducerId(producerId)
         .setProducerEpoch(producerEpoch)
         .setVerifyOnly(true)
         .setTopics(new AddPartitionsToTxnTopicCollection(
-          Seq(new AddPartitionsToTxnTopic().setName(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator.asJava
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
         ))
-      
-      val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+
       // We should add these partitions to the manager to verify.
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
+      val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
       verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+      val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
-      callback(Map(tp -> Errors.INVALID_RECORD).toMap)
+      callback(Map(tp0 -> Errors.INVALID_RECORD).toMap)
       assertEquals(Errors.INVALID_RECORD, result.assertFired.error)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // This time verification is successful

Review Comment:
   nit: `.`



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationFlow(): Unit = {

Review Comment:
   nit: Could we come up with a better name? Perhaps `testTransactionVerificationFlow`?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
+            if (!hasOngoingTransaction(batch.producerId) && (requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null))

Review Comment:
   nit: Any reason why we are using two if statement here? It seems that we could combine them.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3667,6 +3667,118 @@ class UnifiedLogTest {
     listener.verify(expectedHighWatermark = 4)
   }
 
+  @Test
+  def testTransactionIsOngoingAndVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 3
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    val idempotentRecords = MemoryRecords.withIdempotentRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+
+    // Since we wrote idempotent records, we keep verification guard.
+    assertEquals(verificationGuard, log.verificationGuard(producerId))
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence + 2,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
+    assertTrue(log.hasOngoingTransaction(producerId))
+    // Verification guard should be cleared now.
+    assertEquals(null, log.verificationGuard(producerId))
+
+    // A subsequent maybeStartTransactionVerification will be empty since we are already verified.
+    assertEquals(null, log.maybeStartTransactionVerification(producerId))

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3667,6 +3667,118 @@ class UnifiedLogTest {
     listener.verify(expectedHighWatermark = 4)
   }
 
+  @Test
+  def testTransactionIsOngoingAndVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 3
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    val idempotentRecords = MemoryRecords.withIdempotentRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+
+    // Since we wrote idempotent records, we keep verification guard.
+    assertEquals(verificationGuard, log.verificationGuard(producerId))
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence + 2,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
+    assertTrue(log.hasOngoingTransaction(producerId))
+    // Verification guard should be cleared now.
+    assertEquals(null, log.verificationGuard(producerId))

Review Comment:
   nit: assertNull



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3667,6 +3667,118 @@ class UnifiedLogTest {
     listener.verify(expectedHighWatermark = 4)
   }
 
+  @Test
+  def testTransactionIsOngoingAndVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 3
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    val idempotentRecords = MemoryRecords.withIdempotentRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+
+    // Since we wrote idempotent records, we keep verification guard.
+    assertEquals(verificationGuard, log.verificationGuard(producerId))
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence + 2,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
+    assertTrue(log.hasOngoingTransaction(producerId))
+    // Verification guard should be cleared now.
+    assertEquals(null, log.verificationGuard(producerId))
+
+    // A subsequent maybeStartTransactionVerification will be empty since we are already verified.
+    assertEquals(null, log.maybeStartTransactionVerification(producerId))
+
+    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
+      producerId,
+      producerEpoch,
+      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
+    )
+
+    log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3667,6 +3667,118 @@ class UnifiedLogTest {
     listener.verify(expectedHighWatermark = 4)
   }
 
+  @Test
+  def testTransactionIsOngoingAndVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 3
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    val idempotentRecords = MemoryRecords.withIdempotentRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+
+    // Since we wrote idempotent records, we keep verification guard.
+    assertEquals(verificationGuard, log.verificationGuard(producerId))
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence + 2,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
+    assertTrue(log.hasOngoingTransaction(producerId))
+    // Verification guard should be cleared now.
+    assertEquals(null, log.verificationGuard(producerId))
+
+    // A subsequent maybeStartTransactionVerification will be empty since we are already verified.
+    assertEquals(null, log.maybeStartTransactionVerification(producerId))
+
+    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
+      producerId,
+      producerEpoch,
+      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
+    )
+
+    log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    // A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
+    val newVerificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(newVerificationGuard != null)

Review Comment:
   nit: assertNotNull. There are other cases in this file.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))

Review Comment:
   There are other cases here as well.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2095,82 +2105,143 @@ class ReplicaManagerTest {
   }
 
   @Test
-  def testVerificationForTransactionalPartitions(): Unit = {
-    val tp = new TopicPartition(topic, 0)
-    val transactionalId = "txn1"
+  def testVerificationForTransactionalPartitionsOnly(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
     val producerId = 24L
     val producerEpoch = 0.toShort
     val sequence = 0
-    
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val metadataCache = mock(classOf[MetadataCache])
+    val node = new Node(0, "host1", 0)
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
-    val replicaManager = new ReplicaManager(
-      metrics = metrics,
-      config = config,
-      time = time,
-      scheduler = new MockScheduler(time),
-      logManager = mockLogMgr,
-      quotaManagers = quotaManager,
-      metadataCache = metadataCache,
-      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager,
-      addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
-
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1), node)
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1,  List(0, 1)))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      // We must set up the metadata cache to handle the append and verification.
-      val metadataResponseTopic = Seq(new MetadataResponseTopic()
-        .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
-        .setPartitions(Seq(
-          new MetadataResponsePartition()
-            .setPartitionIndex(0)
-            .setLeaderId(0)).asJava))
-      val node = new Node(0, "host1", 0)
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
-      when(metadataCache.contains(tp)).thenReturn(true)
-      when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
-      when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node))
-      when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None)
-      
-      // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error.
-      KafkaRequestHandler.setBypassThreadCheck(true)
+      // If we supply no transactional ID and idempotent records, we do not verify.
+      val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecords(replicaManager, tp0, idempotentRecords)
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      appendRecordsToMultipleTopics(replicaManager, Map(tp0 -> transactionalRecords, tp1 -> idempotentRecords2), transactionalId, Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+      assertNotEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      assertEquals(null, getVerificationGuard(replicaManager, tp1, producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationFlow(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
 
       // Append some transactional records.
       val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
-        new SimpleRecord(s"message $sequence".getBytes))
-      val result = appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
-      
+        new SimpleRecord("message".getBytes))
+
       val transactionToAdd = new AddPartitionsToTxnTransaction()
         .setTransactionalId(transactionalId)
         .setProducerId(producerId)
         .setProducerEpoch(producerEpoch)
         .setVerifyOnly(true)
         .setTopics(new AddPartitionsToTxnTopicCollection(
-          Seq(new AddPartitionsToTxnTopic().setName(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator.asJava
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
         ))
-      
-      val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+
       // We should add these partitions to the manager to verify.
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))
+      val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
       verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+      val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
-      callback(Map(tp -> Errors.INVALID_RECORD).toMap)
+      callback(Map(tp0 -> Errors.INVALID_RECORD).toMap)
       assertEquals(Errors.INVALID_RECORD, result.assertFired.error)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
+
+      // This time verification is successful
+      appendRecords(replicaManager, tp0, transactionalRecords)
+      val appendCallback2 = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback2.capture())
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
+
+      val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue()
+      callback2(Map.empty[TopicPartition, Errors].toMap)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
+      assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
+    } finally {
+      replicaManager.shutdown()
+    }
 
-      // If we supply no transactional ID and idempotent records, we do not verify, so counter stays the same.
-      val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testVerificationGuardOnMultiplePartitions(): Unit = {

Review Comment:
   nit: `TransactionVerification` as well?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3667,6 +3667,118 @@ class UnifiedLogTest {
     listener.verify(expectedHighWatermark = 4)
   }
 
+  @Test
+  def testTransactionIsOngoingAndVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 3
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertEquals(null, log.verificationGuard(producerId))
+
+    val idempotentRecords = MemoryRecords.withIdempotentRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)

Review Comment:
   nit: assertNotNull



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -3273,17 +3272,35 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L,
       producerId = producerId)
     partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertFalse(partition.hasOngoingTransaction(producerId))
 
-    val transactionRecords = createTransactionalRecords(List(
+    def transactionRecords() = createTransactionalRecords(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes),
       new SimpleRecord("k3".getBytes, "v3".getBytes)),
       baseOffset = 0L,
       baseSequence = 3,
       producerId = producerId)
-    partition.appendRecordsToLeader(transactionRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-    assertTrue(partition.hasOngoingTransaction(producerId))
+
+    // When verification guard is not there, we should not be able to append.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
+
+    // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
+    val verificationGuard = partition.maybeStartTransactionVerification(producerId)
+    assertTrue(verificationGuard != null)
+
+    // With the wrong verification guard, append should fail.
+    assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(),
+      origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))
+
+    // We should return the same verification object when we still need to verify. Append should proceed.
+    val verificationGuard2 = partition.maybeStartTransactionVerification(producerId)
+    assertEquals(verificationGuard, verificationGuard2)
+    partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)
+
+    // We should no longer need a verification object. Future appends without verification guard will also succeed.
+    val verificationGuard3 = partition.maybeStartTransactionVerification(producerId)
+    assertEquals(null, verificationGuard3)

Review Comment:
   nit: You could use `assertNull`. There are a few other cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1222426539


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
+          // This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
+          // the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
+          // There are two phases -- the first append to the log and subsequent appends.
+          //
+          // 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
+          // given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
+          // to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
+          // have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
+          // start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
+          // result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
+          //
+          // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
+          // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
+          // ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
+          // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
+          if (!hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))

Review Comment:
   Is it safe to call hasOngoingTransaction when the batch is not transactional? It may be better to keep checking batch.isTransactional first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1224505811


##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -1087,6 +1087,45 @@ class ProducerStateManagerTest {
     assertTrue(!manager.latestSnapshotOffset.isPresent)
   }
 
+  @Test
+  def testEntryForVerification(): Unit = {
+    val originalEntry = stateManager.verificationStateEntry(producerId, true)
+    val originalEntryVerificationGuard = originalEntry.verificationGuard()
+
+    def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
+      val entry = stateManager.verificationStateEntry(producerId, false)
+      assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
+      assertEquals(entry.verificationGuard, newEntry.verificationGuard)
+    }
+
+    // If we already have an entry, reuse it.
+    val updatedEntry = stateManager.verificationStateEntry(producerId, true)
+    verifyEntry(producerId, updatedEntry)
+
+    // Before we add transactional data, we can't remove the entry.
+    stateManager.clearVerificationStateEntry(producerId)
+    verifyEntry(producerId, updatedEntry)
+
+    // Add the transactional data and clear the entry.
+    append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
+    stateManager.clearVerificationStateEntry(producerId)
+    assertEquals(null, stateManager.verificationStateEntry(producerId, false))

Review Comment:
   Or just all the tests 🙃 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1256140857


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -793,19 +793,18 @@ private void failBatch(
         Function<Integer, RuntimeException> recordExceptions,
         boolean adjustSequenceNumbers
     ) {
-        if (transactionManager != null) {
-            try {
-                // This call can throw an exception in the rare case that there's an invalid state transition
-                // attempted. Catch these so as not to interfere with the rest of the logic.
-                transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers);
-            } catch (Exception e) {
-                log.debug("Encountered error when handling a failed batch", e);
-            }
-        }
-
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
 
         if (batch.completeExceptionally(topLevelException, recordExceptions)) {
+            if (transactionManager != null) {

Review Comment:
   Hmmm -- I can probably come up with something...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1260813174


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);

Review Comment:
   Is this sleep to get past the delivery timeout?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -707,11 +707,15 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
 
       val verificationGuards: mutable.Map[TopicPartition, Object] = mutable.Map[TopicPartition, Object]()
-      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) =
+      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) =
         if (transactionStatePartition.isEmpty || !config.transactionPartitionVerificationEnable)
-          (entriesPerPartition, Map.empty)
+          (entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], Map.empty[TopicPartition, Errors])
         else {
-          partitionEntriesForVerification(verificationGuards, entriesPerPartition)
+          val verifiedEntries: mutable.Map[TopicPartition, MemoryRecords] = mutable.Map[TopicPartition, MemoryRecords]()
+          val unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords] = mutable.Map[TopicPartition, MemoryRecords]()
+          val errorEntries: mutable.Map[TopicPartition, Errors] = mutable.Map[TopicPartition, Errors]()

Review Comment:
   nit: You can probably remove the types for those three.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);
+        client.disconnect(node.idString(), true);
+        client.backoff(node, 10);
+
+        sender.runOnce(); // now expire the batch.

Review Comment:
   For my understanding, does the batch fail because of the disconnect or because of the delivery timeout here? The word `expire` makes it unclean to me.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2316,6 +2316,41 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testTransactionVerificationWhenNotLeader(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
+    try {
+      // Append some transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // We should not add these partitions to the manager to verify, but instead throw an error.
+      appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)).onFire( response => {
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
+      })
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]())
+

Review Comment:
   nit: We could remove this empty line.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2316,6 +2316,41 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testTransactionVerificationWhenNotLeader(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), node)
+    try {
+      // Append some transactional records.
+      val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // We should not add these partitions to the manager to verify, but instead throw an error.
+      appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)).onFire( response => {

Review Comment:
   nit: There is an extra space before `response`. You may be able to do `onFire { response =>` as well.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);
+        client.disconnect(node.idString(), true);
+        client.backoff(node, 10);
+
+        sender.runOnce(); // now expire the batch.
+        assertFutureFailure(request1, TimeoutException.class);
+
+        time.sleep(20);
+
+        sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
+        sender.runOnce(); // receive late response
+
+        // Loop once and confirm that the transaction manager does not enter a fatal error state
+        sender.runOnce();
+        assertTrue(txnManager.hasAbortableError());
+        TransactionalRequestResult result = txnManager.beginAbort();
+        sender.runOnce();
+
+        respondToEndTxn(Errors.NONE);
+        sender.runOnce();
+        assertTrue(txnManager::isInitializing);

Review Comment:
   nit: Any reason why you use `txnManager::isInitializing` vs `txnManager.isInitializing()` here and at L3102?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1261385072


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
         assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
     }
 
+    @Test
+    public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
+        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+        TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);
+
+        setupWithTransactionState(txnManager);
+        doInitTransactions(txnManager, producerIdAndEpoch);
+
+        txnManager.beginTransaction();
+        txnManager.maybeAddPartition(tp0);
+        client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
+        sender.runOnce();
+
+        // Send first ProduceRequest
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();  // send request
+
+        Node node = metadata.fetch().nodes().get(0);
+        time.sleep(2000L);

Review Comment:
   If we don't sleep, then the future does not complete. There is another test in the file that explains this only expires the batch if it hits the delivery timeout. Note in the example, time.sleep(1000) was already called.
   
   ```
           // We add 600 millis to expire the first batch but not the second.
           // Note deliveryTimeoutMs is 1500.
            time.sleep(600L);
           client.disconnect(node.idString());
           client.backoff(node, 10);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on PR #13787:
URL: https://github.com/apache/kafka/pull/13787#issuecomment-1599609852

   I'm getting tripped up on some flaky tests -- namely 
   [kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupId__/)
   [kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsBounceTest/testWithGroupId___2/)
   [kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsTest/testBumpTransactionalEpoch_String__quorum_kraft/)
   [kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13787/13/testReport/junit/kafka.api/TransactionsTest/testBumpTransactionalEpoch_String__quorum_kraft_2/)
   
   Both involve restarting brokers, so I suspect I'm missing some logic there. testBumpTransactionalEpoch was failing before, but with a different error, so I've filed https://issues.apache.org/jira/browse/KAFKA-15099 for that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362352735


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   @artemlivshits Note that this is Java, so you actually cannot pass an address (many GC implementations move objects and update their addresses).
   
   > so passing anything is perfectly fine - 2 different objects would have 2 different addresses. null works because we know that it is not equal to any real object identity.
   
   Actually this is not true, if you pass a literal string (eg "foo") it is interned (just as one example).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14884: Include check transaction is still ongoing right before append (take 2) [kafka]

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1362327239


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
-                            requestLocal: RequestLocal): LogAppendInfo = {
+                            requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {

Review Comment:
   I suggested creating a type, but Artem argued that it was easier to just use Object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org