You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "hachikuji (via GitHub)" <gi...@apache.org> on 2023/02/18 00:35:05 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1110463718


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,95 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
+ * for producers to retry if it does not have an available producer id and is waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with Logging {
+                           controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)

Review Comment:
   I wonder if we could consolidate these two fields using the queue. We can peek in `generateProducerId` and attempt allocation while the background is responsible for pushing new blocks as needed. If it fails, we can dequeue the entry and loop in case there is another block. Once there are no blocks, we could return as in the current patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org