You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/02/18 01:17:07 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110472349


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val blockCount = new AtomicLong(0)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
+  override def generateProducerId(): Try[Long] = {
+    val currentBlockCount = blockCount.get
+    currentProducerIdBlock.get.claimNextId().asScala match {
+      case None =>
+        // Check the next block if current block is full
+        val block = nextProducerIdBlock.getAndSet(null)
         if (block == null) {
           // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
           // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block")
+          maybeRequestNextBlock(currentBlockCount)
+          Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
         } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+          // Fence other threads from sending another AllocateProducerIdsRequest
+          blockCount.incrementAndGet()

Review Comment:
   i was unable to confirm through testing but i still think there can be a race. Consider this scenario:
   
   block size = 10 for simplicity
   3 threads: t1, t2, and t3 all enter `generateProducerId()`. they start with `blockCount=1`
   `currentBlock (currentProducerIdBlock) = [0, 9, 9]` --> this represents `[first id, next pid to return, last id]`
   `nextBlock (nextProducerIdBlock) = null`
   
    
   - all 3 threads try to claim pid, only one succeeds. let's assume `t2` claimed the last pid, 9. 
   - `t1` sees `nextBlock` is empty, requests new block and returns error.
   - `t1` handles another request, enters `generateProducerId()` at `blockCount=1`
   - broker to controller thread updates `nextBlock=[10, 10, 19]`
   - `t1` obtains `nextBlock [10, 10, 19]`. `nextBlock` is now `null` (L182)
   - before `t1` increments `blockCount` (L190), `t2` (nextPid=9) checks prefetch criteria (L202). 
   - `prefetchTarget = 0 + 10 * 0.9 = 9`  so `t2` invokes `maybeRequestNextBlock(blockCount=1)`. this passes because `requestInFlight == false`, `blockCount == 1`, and `nextBlock == null`
   - `t2` returns pid 9.
   - `t1` increments `blockCount=2`, sets `currentBlock=[10, 10, 19]`, and returns pid 10.
   - broker to controller thread updates `nextBlock=[20, 20, 29]` which is the prefetched block
   - `t3` now enters L180 since it failed to claim the pid when the current block pointed at `[0, 9, 9]`
   - `t3` obtains `nextBlock` and sets `currentBlock` to `[20, 20, 29]`
   - we lose an entire block `[10, 10, 19]`
   
   It might not be an entire block, but this case will lead to blocks not being fully exhausted. this is even rarer in practice because the block sizes are larger and `t3` would have to be idle for a longer period of time. 
   
   i don't think this is possible with 2 threads because we need 2 separate threads to fetch the 2 blocks and another thread that observes the initial block [0, 9, 9] and replaces it with [20, 20, 29].
   
   i'm not sure how to fix the 2 consecutive fetches, but for replacing blocks we can use the `blockCount` and confirm the thread's blockCount is equal to the current blockCount before replacing. Then `t3` will know it's outdated in the example above. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org