You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/02/17 01:38:31 UTC

[GitHub] [kafka] jeffkbkim opened a new pull request, #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

jeffkbkim opened a new pull request, #13267:
URL: https://github.com/apache/kafka/pull/13267

   RPCProducerIdManager initiates an async request to the controller to grab a block of producer IDs and then blocks waiting for a response from the controller.
   
   This is done in the request handler threads while holding a global lock. This means that if many producers are requesting producer IDs and the controller is slow to respond, many threads can get stuck waiting for the lock.
   
   This patch aims to:
   * resolve the deadlock scenario mentioned above by not waiting for a new block and returning an error immediately
   * remove synchronization usages in RpcProducerIdManager.generateProducerId()
   * handle errors returned from generateProducerId()
   * confirm producers backoff before retrying
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1204862724


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val IterationLimit = 3
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
+          } else {
+            currentProducerIdBlock.set(block)
+            requestInFlight.set(false)
+            iteration = iteration + 1
+          }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+        case Some(nextProducerId) =>
+          // Check if we need to prefetch the next block
+          val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
+          if (nextProducerId == prefetchTarget) {
+            maybeRequestNextBlock()
           }
-        }
+          result = Success(nextProducerId)
+      }
+      if (iteration == IterationLimit) {
+        result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
       }
-      nextProducerId
     }
+    result
   }
 
 
-  private def maybeRequestNextBlock(): Unit = {
-    if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) {
-      sendRequest()
+  // Visible for testing
+  private[transaction] def maybeRequestNextBlock(): Unit = {
+    if (nextProducerIdBlock.get == null &&
+      requestInFlight.compareAndSet(false, true) ) {

Review Comment:
   thanks! that makes much more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a race. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   
    
   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another request, enters `generateProducerId()` at `blockCount=1`
   - broker to controller thread updates `nextBlock=[10, 10, 19]`
   - `t1` obtains `nextBlock [10, 10, 19]`. `nextBlock` is now `null` (L182)
   - before `t1` increments `blockCount` (L190), `t2` (nextPid=9) checks prefetch criteria (L202). 
   - `prefetchTarget = 0 + 10 * 0.9 = 9`  so `t2` invokes `maybeRequestNextBlock(blockCount=1)`. this passes because `requestInFlight == false`, `blockCount == 1`, and `nextBlock == null`
   - `t2` returns pid 9.
   - `t1` increments `blockCount=2`, sets `currentBlock=[10, 10, 19]`, and returns pid 10.
   - broker to controller thread updates `nextBlock=[20, 20, 29]` which is the prefetched block
   - `t3` now enters L180 since it failed to claim the pid when the current block pointed at `[0, 9, 9]`
   - `t3` obtains `nextBlock` and sets `currentBlock` to `[20, 20, 29]`
   - we lose an entire block `[10, 10, 19]`
   
   It might not be an entire block, but this case will lead to blocks not being fully exhausted. this is even rarer in practice because the block sizes are larger and `t3` would have to be idle for a longer period of time. 
   
   i don't think this is possible with 2 threads because we need 2 separate threads to fetch the 2 blocks and another thread that observes the initial block [0, 9, 9] and replaces it with [20, 20, 29].
   
   i'm not sure how to fix the 2 consecutive fetches, but for replacing blocks we can use the `blockCount` and confirm the thread's blockCount is equal to the current blockCount before replacing. Then `t3` will know that its view is outdated in the example above. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177133004


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   this was somewhat the original code, which jason mentioned in https://github.com/apache/kafka/pull/13267#discussion_r1169038238
   
   > This part seems unsafe. As soon as we set this, other threads can begin accessing the block. It seems possible, if unlikely, that claimNextId fails to allocate. I think it would be simpler to set currentProducerIdBlock and loop. Or if we don't like the loop, then just return the coordinator loading error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145989


##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   roughly 6 seconds. i have lowered this to 10s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1207028405


##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   I would keep it a bit higher so that it does not become flaky. Have you run it a few times on your own to make sure it is not flaky already?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169101298


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   yes, without a sense of "epoch" we can have cases where we send additional controller requests right after a block is set. causing us to move forward without fully exhausting the existing block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1179541960


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   But we allocate the id before we set the currentProducerIdBlock, so nobody can see before we successfully allocate the block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##########
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
     clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = {
+    clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1")
+    clusterInstance.start()
+    verifyUniqueIds(clusterInstance)
+    clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   Can we replace the TODO with a jira? 



##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   How long does this test take?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
     } else {
       val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
         case None =>
-          val producerId = producerIdManager.generateProducerId()
-          val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
-            producerId = producerId,
-            lastProducerId = RecordBatch.NO_PRODUCER_ID,
-            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-            lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-            txnTimeoutMs = transactionTimeoutMs,
-            state = Empty,
-            topicPartitions = collection.mutable.Set.empty[TopicPartition],
-            txnLastUpdateTimestamp = time.milliseconds())
-          txnManager.putTransactionStateIfNotExists(createdMetadata)
+          val result = producerIdManager.generateProducerId()

Review Comment:
   nit: maybe we don't need `result`. Perhaps a little more concise to match on `producerIdManager.generateProducerId()`? Same comment below.



##########
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##########
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
     private final int assignedBrokerId;
     private final long firstProducerId;
     private final int blockSize;
+    private final AtomicLong producerIdCounter;
 
     public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) {
         this.assignedBrokerId = assignedBrokerId;
         this.firstProducerId = firstProducerId;
         this.blockSize = blockSize;
+        producerIdCounter = new AtomicLong(firstProducerId);
+    }
+
+    /**
+     * Claim the next available producer id from the block.
+     * Returns an empty result if there are no more available producer ids in the block.
+     */
+    public Optional<Long> claimNextId() {
+        long nextId = producerIdCounter.getAndIncrement();
+        if (nextId > lastProducerId()) {

Review Comment:
   This first check is duplicated below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169036716


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   I did not follow why we need this. It seems like it's being used to prevent concurrent sends?



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()
+          currentProducerIdBlock.set(block)

Review Comment:
   This part seems unsafe. As soon as we set this, other threads can begin accessing the block. It seems possible, if unlikely, that `claimNextId` fails to allocate. I think it would be simpler to set `currentProducerIdBlock` and loop. Or if we don't like the loop, then just return the coordinator loading error.



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -207,37 +235,39 @@ class RPCProducerIdManager(brokerId: Int,
     })
   }
 
+  // Visible for testing
   private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
-    requestInFlight.set(false)
     val data = response.data
+    var successfulResponse = false
     Errors.forCode(data.errorCode()) match {
       case Errors.NONE =>
         debug(s"Got next producer ID block from controller $data")
         // Do some sanity checks on the response
-        if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
-          nextProducerIdBlock.put(Failure(new KafkaException(
-            s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")))
+        if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) {
+          error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")
         } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
-          nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data")))
+          error(s"Producer ID block includes invalid ID range: $data")
         } else {
-          nextProducerIdBlock.put(
-            Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())))
+          nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))
+          successfulResponse = true
         }
       case Errors.STALE_BROKER_EPOCH =>
-        warn("Our broker epoch was stale, trying again.")
-        maybeRequestNextBlock()
+        warn("Our broker currentBlockCount was stale, trying again.")
       case Errors.BROKER_ID_NOT_REGISTERED =>
         warn("Our broker ID is not yet known by the controller, trying again.")
-        maybeRequestNextBlock()
       case e: Errors =>
-        warn("Had an unknown error from the controller, giving up.")
-        nextProducerIdBlock.put(Failure(e.exception()))
+        error(s"Had an unknown error from the controller: ${e.exception}")
+    }
+    requestInFlight.set(false)
+    if (!successfulResponse) {
+      time.sleep(RetryBackoffMs)

Review Comment:
   I am not sure the sleep here is a good idea. I think this gets invoked from the `ControllerChannelManager` which is currently shared with the forwarding manager and others. Sleeping will block other requests as well. I think we need to enforce the backoff in `maybeRequestNextBlock`. 



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -41,6 +43,7 @@ import scala.util.{Failure, Success, Try}
 object ProducerIdManager {
   // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block
   val PidPrefetchThreshold = 0.90
+  val RetryBackoffMs = 100

Review Comment:
   Would 50ms be enough? It seems like we are missing some configuration for backoff behavior for requests from brokers to the controller. It would be nice to have support exponential backoff as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110464884


##########
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##########
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
     private final int assignedBrokerId;
     private final long firstProducerId;
     private final int blockSize;
+    private final AtomicLong producerIdCounter;
 
     public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) {
         this.assignedBrokerId = assignedBrokerId;
         this.firstProducerId = firstProducerId;
         this.blockSize = blockSize;
+        producerIdCounter = new AtomicLong(firstProducerId);
+    }
+
+    /**
+     * Claim the next available producer id from the block.
+     * Returns an empty result if there are no more available producer ids in the block.
+     */
+    public Optional<Long> claimNextId() {

Review Comment:
   Probably helpful to have a basic unit test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1569176269

   test failure
   `testSendOffsetsToTransactionTimeout(String).quorum=zk – kafka.api.TransactionsTest`
   
   ```
   org.apache.kafka.common.errors.TimeoutException: Timeout expired after 3000ms while awaiting InitProducerId
   ```
   which looks related to the changes. but the test passed locally several times.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a race. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   
    
   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another request, enters `generateProducerId()` at `blockCount=1`
   - broker to controller thread updates `nextBlock=[10, 10, 19]`
   - `t1` obtains `nextBlock [10, 10, 19]`. `nextBlock` is now `null` (L182)
   - before `t1` increments `blockCount` (L190), `t2` (nextPid=9) checks prefetch criteria (L202). 
   - `prefetchTarget = 0 + 10 * 0.9 = 9`  so `t2` invokes `maybeRequestNextBlock(blockCount=1)`. this passes because `requestInFlight == false`, `blockCount == 1`, and `nextBlock == null`
   - `t2` returns pid 9.
   - `t1` increments `blockCount=2`, sets `currentBlock=[10, 10, 19]`, and returns pid 10.
   - broker to controller thread updates `nextBlock=[20, 20, 29]` which is the prefetched block
   - `t3` now enters L180 since it failed to claim the pid when the current block pointed at `[0, 9, 9]`
   - `t3` obtains `nextBlock` and sets `currentBlock` to `[20, 20, 29]`
   - we lose an entire block `[10, 10, 19]`
   
   It might not be an entire block, but this case will lead to blocks not being fully exhausted. this is even rarer in practice because the block sizes are larger and `t3` would have to be idle for a longer period of time. 
   
   i don't think this is possible with 2 threads because we need 2 separate threads to fetch the 2 blocks and another thread that observes the initial block [0, 9, 9] and replaces it with [20, 20, 29].
   
   i'm not sure how to fix the 2 consecutive fetches, but for replacing blocks we can use the `blockCount` and confirm the thread's blockCount is equal to the current blockCount before replacing. Then `t3` will know it's outdated in the example above. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)

Review Comment:
   I wonder if we could consolidate these two fields using the queue. We can peek in `generateProducerId` and attempt allocation while the background is responsible for pushing new blocks as needed. If it fails, we can dequeue the entry and loop in case there is another block. Once there are no blocks, we could return as in the current patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170068205


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   updated to use requestInFlight to fence



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int,
     })
   }
 
+  // Visible for testing
   private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
-    requestInFlight.set(false)
     val data = response.data
+    var successfulResponse = false
     Errors.forCode(data.errorCode()) match {
       case Errors.NONE =>
         debug(s"Got next producer ID block from controller $data")
         // Do some sanity checks on the response
-        if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
-          nextProducerIdBlock.put(Failure(new KafkaException(
-            s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")))
+        if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) {
+          error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")
         } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
-          nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data")))
+          error(s"Producer ID block includes invalid ID range: $data")
         } else {
-          nextProducerIdBlock.put(
-            Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen())))
+          nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))
+          successfulResponse = true
         }
       case Errors.STALE_BROKER_EPOCH =>
-        warn("Our broker epoch was stale, trying again.")
-        maybeRequestNextBlock()
+        warn("Our broker currentBlockCount was stale, trying again.")
       case Errors.BROKER_ID_NOT_REGISTERED =>
         warn("Our broker ID is not yet known by the controller, trying again.")
-        maybeRequestNextBlock()
       case e: Errors =>
-        warn("Had an unknown error from the controller, giving up.")
-        nextProducerIdBlock.put(Failure(e.exception()))
+        error(s"Had an unknown error from the controller: ${e.exception}")

Review Comment:
   nit: maybe we can rephrase this message a little for clarity
   ```scala
   error(s"Received an unexpected error code from the controller: $e")
   ```



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   It's probably unlikely for this loop to continue indefinitely, but I wonder if we should bound the retries to a fixed small number (say 3). if we break the loop and we didn't get a result, we can return LOAD_IN_PROGRESS. What do you think?



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
+          } else {
+            currentProducerIdBlock.set(block)
+            requestInFlight.set(false)
           }
-        }
+
+        case Some(nextProducerId) =>
+          // Check if we need to prefetch the next block
+          val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
+          if (nextProducerId == prefetchTarget) {
+            maybeRequestNextBlock()
+          }
+          result = Success(nextProducerId)
       }
-      nextProducerId
     }
+    result
   }
 
 
-  private def maybeRequestNextBlock(): Unit = {
-    if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) {
+  // Visible for testing
+  private[transaction] def maybeRequestNextBlock(): Unit = {
+    if (nextProducerIdBlock.get == null &&
+      requestInFlight.compareAndSet(false, true) ) {
+
+      if (shouldBackoff.compareAndSet(true, false)) {
+        time.sleep(RetryBackoffMs)

Review Comment:
   Sleeping here will block the request thread. I was thinking instead of sleeping that we could set a backoff deadline and then check if it has been reached. If not, then we break the loop early and return LOAD_IN_PROGRESS. Would that work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   this no longer happens because we can no longer send a request until `currentBlock` is set. `t2` which checks the prefetch criteria in the example above will either observe that `currentBlock` is `[10, 10, 19]` which does not fit the prefetch criteria or `requestInFlight==true` so it cannot send another request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1180578924


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   ah that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1199413525


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val IterationLimit = 3

Review Comment:
   Since this is constant, maybe we can move it to the companion class?



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val IterationLimit = 3
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
+          } else {
+            currentProducerIdBlock.set(block)
+            requestInFlight.set(false)
+            iteration = iteration + 1
+          }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+        case Some(nextProducerId) =>
+          // Check if we need to prefetch the next block
+          val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
+          if (nextProducerId == prefetchTarget) {
+            maybeRequestNextBlock()
           }
-        }
+          result = Success(nextProducerId)
+      }
+      if (iteration == IterationLimit) {
+        result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
       }
-      nextProducerId
     }
+    result
   }
 
 
-  private def maybeRequestNextBlock(): Unit = {
-    if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) {
-      sendRequest()
+  // Visible for testing
+  private[transaction] def maybeRequestNextBlock(): Unit = {
+    if (nextProducerIdBlock.get == null &&
+      requestInFlight.compareAndSet(false, true) ) {

Review Comment:
   Hmm, this check seems backwards to me. Could we check the backoff deadline first and then use `requestInFlight` to gate the sending of the request? If this thread sends the request successfully, then it can reset the backoff deadline.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1169133482


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)

Review Comment:
   Why is the `requestInFlight` flag not sufficient to prevent this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1210489030


##########
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##########
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+    // Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
+    // For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
+    // All other requests should fail immediately.
+
+    val numThreads = 5
+    val latch = new CountDownLatch(idBlockLen * 3)
     val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-    IntStream.range(0, idBlockLen * 3).forEach { i =>
-      assertEquals(i, manager.generateProducerId())
+    val pidMap = mutable.Map[Long, Int]()
+    val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+    for ( _ <- 0 until numThreads) {
+      requestHandlerThreadPool.submit(() => {
+        while(latch.getCount > 0) {
+          val result = manager.generateProducerId()
+          result match {
+            case Success(pid) =>
+              pidMap synchronized {
+                if (latch.getCount != 0) {
+                  val counter = pidMap.getOrElse(pid, 0)
+                  pidMap += pid -> (counter + 1)
+                  latch.countDown()
+                }
+              }
+
+            case Failure(exception) =>
+              assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass)
+          }
+          Thread.sleep(100)
+        }
+      }, 0)
+    }
+    assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   updated to 12. yeah, i've ran it several times to make sure it's not flaky



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1262822068


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))

Review Comment:
   I think so. Thoughts @hachikuji?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1266981147


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))

Review Comment:
   created https://issues.apache.org/jira/browse/KAFKA-15207



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1602740242

   failure in `testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest` again
   
   ```
   org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting InitProducerId
   ```
   
   There's 
   ```
   [2023-06-22 00:21:19,219] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition __transaction_state-1. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread:70)
   [2023-06-22 00:21:19,329] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition topic1-1 marked as failed (kafka.server.ReplicaFetcherThread:70)
   [2023-06-22 00:21:19,330] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition topic2-1 marked as failed (kafka.server.ReplicaFetcherThread:70)
   ```
   
   running locally 50 times pass both kraft and zk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "artemlivshits (via GitHub)" <gi...@apache.org>.
artemlivshits commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1174283192


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   I actually wonder if we need to loop at all, seems like we only need to loop when we hit the branch
   
   ```
      currentProducerIdBlock.set(block)
      requestInFlight.set(false)
   ```
   I think we could do this instead:
   ```
      result = Success(block.claimNextId().asScala.get)
      currentProducerIdBlock.set(block)
      requestInFlight.set(false)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1522547962

   @hachikuji this is ready for another round of review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177161037


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   this no longer happens because now we cannot send a request until `currentBlock` is set. `t2` which checks the prefetch criteria in the example above will either observe that `currentBlock` is `[10, 10, 19]` which does not fit the prefetch criteria or `requestInFlight==true` so it cannot send another request. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a race where we fetch two blocks together and replace the current block with the last block. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   
    
   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another request, enters `generateProducerId()` at `blockCount=1`
   - broker to controller thread updates `nextBlock=[10, 10, 19]`
   - `t1` obtains `nextBlock [10, 10, 19]`. `nextBlock` is now `null` (L182)
   - before `t1` increments `blockCount` (L190), `t2` (nextPid=9) checks prefetch criteria (L202). 
   - `prefetchTarget = 0 + 10 * 0.9 = 9`  so `t2` invokes `maybeRequestNextBlock(blockCount=1)`. this passes because `requestInFlight == false`, `blockCount == 1`, and `nextBlock == null`
   - `t2` returns pid 9.
   - `t1` increments `blockCount=2`, sets `currentBlock=[10, 10, 19]`, and returns pid 10.
   - broker to controller thread updates `nextBlock=[20, 20, 29]` which is the prefetched block
   - `t3` now enters L180 since it failed to claim the pid when the current block pointed at `[0, 9, 9]`
   - `t3` obtains `nextBlock` and sets `currentBlock` to `[20, 20, 29]`
   - we lose an entire block `[10, 10, 19]`
   
   It might not be an entire block, but this case will lead to blocks not being fully exhausted. this is even rarer in practice because the block sizes are larger and `t3` would have to be idle for a longer period of time. 
   
   i don't think this is possible with 2 threads because we need 2 separate threads to fetch the 2 blocks and another thread that observes the initial block [0, 9, 9] and replaces it with [20, 20, 29].
   
   i'm not sure how to fix the 2 consecutive fetches, but for replacing non-consecutive blocks we can use the `blockCount` and confirm the thread's blockCount is equal to the current blockCount before replacing. Then `t3` will know that its view is outdated in the example above. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110474085


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)

Review Comment:
   i think that would work, but not sure how much more readable the code will become. we will need to rely on the size of the queue (queueSize <= 1) to determine whether we can fetch the next block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1177143449


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
+          } else {
+            currentProducerIdBlock.set(block)
+            requestInFlight.set(false)
           }
-        }
+
+        case Some(nextProducerId) =>
+          // Check if we need to prefetch the next block
+          val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
+          if (nextProducerId == prefetchTarget) {
+            maybeRequestNextBlock()
+          }
+          result = Success(nextProducerId)
       }
-      nextProducerId
     }
+    result
   }
 
 
-  private def maybeRequestNextBlock(): Unit = {
-    if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) {
+  // Visible for testing
+  private[transaction] def maybeRequestNextBlock(): Unit = {
+    if (nextProducerIdBlock.get == null &&
+      requestInFlight.compareAndSet(false, true) ) {
+
+      if (shouldBackoff.compareAndSet(true, false)) {
+        time.sleep(RetryBackoffMs)

Review Comment:
   That is true, but in practice it will mainly block the controller thread as the controller thread retries, not the request thread. In fact, `shouldBackoff` is only set inside `handleAllocateProducerIdsResponse` and is only retried when it failed so we shouldn't expect the request thread to ever reach this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145853


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##########
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
     clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = {
+    clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1")
+    clusterInstance.start()
+    verifyUniqueIds(clusterInstance)
+    clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   I have appended the JIRA at the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110473150


##########
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##########
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
     private final int assignedBrokerId;
     private final long firstProducerId;
     private final int blockSize;
+    private final AtomicLong producerIdCounter;
 
     public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) {
         this.assignedBrokerId = assignedBrokerId;
         this.firstProducerId = firstProducerId;
         this.blockSize = blockSize;
+        producerIdCounter = new AtomicLong(firstProducerId);
+    }
+
+    /**
+     * Claim the next available producer id from the block.
+     * Returns an empty result if there are no more available producer ids in the block.
+     */
+    public Optional<Long> claimNextId() {

Review Comment:
   thanks for the catch 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji merged PR #13267:
URL: https://github.com/apache/kafka/pull/13267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1257288831


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))

Review Comment:
   I thought we would only do this for older clients and introduce a more descriptive error code for newer clients. What happened to that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1260523356


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    var iteration = 0
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))

Review Comment:
   hmm, i don't recall being involved in that discussion. should i create a jira for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org