You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/14 19:53:45 UTC

[GitHub] [kafka] mumrah opened a new pull request #9749: Only schedule AlterIsr thread when we have an ISR change

mumrah opened a new pull request #9749:
URL: https://github.com/apache/kafka/pull/9749


   Rather than scheduling every 50ms to check for unsent updates, we should schedule the propagation thread only after we receive ISR updates


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r556053749



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -112,68 +111,74 @@ class DefaultAlterIsrManager(
   val brokerEpochSupplier: () => Long
 ) extends AlterIsrManager with Logging with KafkaMetricsGroup {
 
-  // Used to allow only one pending ISR update per partition
-  private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
+  // Used to allow only one pending ISR update per partition (visible for testing)
+  private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
   private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
-
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    maybePropagateIsrChanges()
+    enqueued
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
+  private[server] def maybePropagateIsrChanges(): Unit = {
+    // Send all pending items if there is not already a request in-flight.
     if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
+      // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = {
+    if(!inflightRequest.compareAndSet(true, false)) {

Review comment:
       nit: space after `if`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554113195



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS)

Review comment:
       Yea I don't see any problem with that

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {

Review comment:
       You're right, there's a race between the isEmpty check and clearing the inflight flag. Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554105440



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS)

Review comment:
       If we are only waiting 1ms, would it be simpler to call `propagateIsrChanges` directly? Similarly after receiving a response with no error.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {

Review comment:
       This seems to introduce a race condition. Say we have an inflight request. Is the following sequence possible?
   
   1. the response returns and the io thread calls `propagateIsrChanges` and sees an empty `unsentIsrUpdates`
   2. request thread calls submit and inserts a new item in `unsentIsrUpdates`
   3. request thread fails `compareAndSet` on `inflightRequest`
   4. io thread clears `inflightRequest`
   
   It seems like we might need a lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#issuecomment-758679230


   It seems I over-complicated this while trying to avoid clearing the inflight in the response handler when another request needs to go out. I think what you suggest does work. If the response handler just immediately sets the inflight flag to false it doesn't matter if another `submit` call races with checking the queue size since one of them will set the inflight flag true and the request will get submitted. 
   
   I'll go ahead and try this out


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554229808



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,78 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protects the updates of the inflight flag and prevents new pending items from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    val (didSubmit, needsPropagate) = inReadLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+        (true, !inflightRequest)
+      } else {
+        (false, false)
+      }
+    }
+    if (needsPropagate) {
+      propagateIsrChanges(true)
+    }
+    didSubmit
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
-      sendRequest(inflightAlterIsrItems.toSeq)
+  private def propagateIsrChanges(checkInflight: Boolean): Unit = inWriteLock(inflightLock) {

Review comment:
       Do you think contention for this lock will be an issue? It seems unlikely to me. That makes me think we might be able to simplify the concurrency if we replace the read-write lock with a simple ReentrantLock. Then we could design this around two methods
   
   ```scala
   def maybePropagateIsrChanges(): Unit = lock synchronized {
    if (!inflightRequest) {
      ... 
    }
   }
   
   def clearInFlightRequest(): Unit = lock synchronized {
     inFlightRequest = false
   }
   ```
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r543057645



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -62,27 +61,38 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private val lastIsrPropagationMs = new AtomicLong(0)
 
-  override def start(): Unit = {
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
-  }
+  override def start(): Unit = { }
 
   override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)

Review comment:
       The linger attempt feels a tad clumsy. I wonder if it is necessary. With the constraint of only having one inflight request, I think we will end up batching effectively when the rate of requests goes up. Most importantly, we'll be able to batch effectively when the controller is slow to respond. Would this be complex to implement?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554113195



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS)

Review comment:
       Yea I don't see any problem with that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r555127356



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,78 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protects the updates of the inflight flag and prevents new pending items from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    val (didSubmit, needsPropagate) = inReadLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+        (true, !inflightRequest)
+      } else {
+        (false, false)
+      }
+    }
+    if (needsPropagate) {
+      propagateIsrChanges(true)
+    }
+    didSubmit
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
-      sendRequest(inflightAlterIsrItems.toSeq)
+  private def propagateIsrChanges(checkInflight: Boolean): Unit = inWriteLock(inflightLock) {

Review comment:
       I was thinking of the case when lots of partitions come up at once, like in a rolling restart scenario. However, not much happens in the submit call, so it's probably negligible. I tried this change out this morning and there's no noticeable difference. I'll polish it up and fix the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554113380



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {

Review comment:
       You're right, there's a race between the isEmpty check and clearing the inflight flag. Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r555294948



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protect updates of the inflight flag and prevent additional pending items from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantLock = new ReentrantLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    inLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+        maybePropagateIsrChanges()
+        true
+      } else {
+        false
+      }
+    }
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
+  private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+    // Send all pending items if there is not already a request in-flight.
+    if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+      // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
+      inflightRequest = true
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+    if (!inflightRequest) {
+      warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight")
     }
+    inflightRequest = false
+  }
 
+  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+    val message = buildRequest(inflightAlterIsrItems)
     debug(s"Sending AlterIsr to controller $message")
 
     // We will not timeout AlterISR request, instead letting it retry indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the existing isrState
-    // which causes the inflight requests to be ignored.
+    // which causes the response for those partitions to be ignored.
     controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
-          try {
-            debug(s"Received AlterIsr response $response")
-            val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
-          } finally {
-            clearInflightRequests()
+          debug(s"Received AlterIsr response $response")
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match {
+            case Errors.NONE =>
+              // In the normal case, check for pending updates to send immediately
+              clearInFlightRequest()

Review comment:
       nit: shall we pull this out of the match since it is done regardless? It would also make the code a little more resilient since we got rid of the `try/catch` to put it before `handleAlterIsrResponse`.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protect updates of the inflight flag and prevent additional pending items from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantLock = new ReentrantLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    inLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+        maybePropagateIsrChanges()
+        true
+      } else {
+        false
+      }
+    }
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
+  private[server] def maybePropagateIsrChanges(): Unit = inLock(inflightLock) {
+    // Send all pending items if there is not already a request in-flight.
+    if (!inflightRequest && !unsentIsrUpdates.isEmpty) {
+      // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
       unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
       sendRequest(inflightAlterIsrItems.toSeq)
+      inflightRequest = true
     }
   }
 
-  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
-    val message = buildRequest(inflightAlterIsrItems)
-
-    def clearInflightRequests(): Unit = {
-      // Be sure to clear the in-flight flag to allow future AlterIsr requests
-      if (!inflightRequest.compareAndSet(true, false)) {
-        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
-      }
+  private[server] def clearInFlightRequest(): Unit = inLock(inflightLock) {
+    if (!inflightRequest) {
+      warn("Attempting to clear AlterIsr in-flight flag when no apparent request is in-flight")
     }
+    inflightRequest = false
+  }
 
+  private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
+    val message = buildRequest(inflightAlterIsrItems)
     debug(s"Sending AlterIsr to controller $message")
 
     // We will not timeout AlterISR request, instead letting it retry indefinitely
     // until a response is received, or a new LeaderAndIsr overwrites the existing isrState
-    // which causes the inflight requests to be ignored.
+    // which causes the response for those partitions to be ignored.
     controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message),
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
-          try {
-            debug(s"Received AlterIsr response $response")
-            val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
-          } finally {
-            clearInflightRequests()
+          debug(s"Received AlterIsr response $response")
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems) match {

Review comment:
       Just doublechecking our locking order. When we call `submit` from `Partition`, we first have the leader and ISR write lock and then we acquire the inflight lock added here. Now when we call `handleAlterIsrResponse`, we may need to reacquire the leader and ISR write lock, but that is ok, because do not need to hold the inflight lock when we do so. I think it might be worth adding some comments on the locking order somewhere in this class since the use of the leader and ISR lock is kind of hidden.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,75 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile

Review comment:
       Do we still need this? It looks like all accesses are protected with the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r554105440



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 1, -1, TimeUnit.MILLISECONDS)

Review comment:
       If we are only waiting 1ms, would it be simpler to call `propagateIsrChanges` directly? Similarly after receiving a response with no error.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -122,43 +121,47 @@ class DefaultAlterIsrManager(
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {

Review comment:
       This seems to introduce a race condition. Say we have an inflight request. Is the following sequence possible?
   
   1. the response returns and the io thread calls `propagateIsrChanges` and sees an empty `unsentIsrUpdates`
   2. request thread calls submit and inserts a new item in `unsentIsrUpdates`
   3. request thread fails `compareAndSet` on `inflightRequest`
   4. io thread clears `inflightRequest`
   
   It seems like we might need a lock.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -116,63 +116,78 @@ class DefaultAlterIsrManager(
   private val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]()
 
   // Used to allow only one in-flight request at a time
-  private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
+  @volatile
+  private var inflightRequest: Boolean = false
 
-  private val lastIsrPropagationMs = new AtomicLong(0)
+  // Protects the updates of the inflight flag and prevents new pending items from being submitted while we are
+  // preparing a request
+  private val inflightLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   override def start(): Unit = {
     controllerChannelManager.start()
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
   }
 
   override def shutdown(): Unit = {
     controllerChannelManager.shutdown()
   }
 
   override def submit(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    val (didSubmit, needsPropagate) = inReadLock(inflightLock) {
+      if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+        (true, !inflightRequest)
+      } else {
+        (false, false)
+      }
+    }
+    if (needsPropagate) {
+      propagateIsrChanges(true)
+    }
+    didSubmit
   }
 
   override def clearPending(topicPartition: TopicPartition): Unit = {
     unsentIsrUpdates.remove(topicPartition)
   }
 
-  private def propagateIsrChanges(): Unit = {
-    if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
-      // Copy current unsent ISRs but don't remove from the map
-      val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
-
-      val now = time.milliseconds()
-      lastIsrPropagationMs.set(now)
-      sendRequest(inflightAlterIsrItems.toSeq)
+  private def propagateIsrChanges(checkInflight: Boolean): Unit = inWriteLock(inflightLock) {

Review comment:
       Do you think contention for this lock will be an issue? It seems unlikely to me. That makes me think we might be able to simplify the concurrency if we replace the read-write lock with a simple ReentrantLock. Then we could design this around two methods
   
   ```scala
   def maybePropagateIsrChanges(): Unit = lock synchronized {
    if (!inflightRequest) {
      ... 
    }
   }
   
   def clearInFlightRequest(): Unit = lock synchronized {
     inFlightRequest = false
   }
   ```
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9749:
URL: https://github.com/apache/kafka/pull/9749#discussion_r542714371



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -62,27 +61,38 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private val lastIsrPropagationMs = new AtomicLong(0)
 
-  override def start(): Unit = {
-    scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, 50, TimeUnit.MILLISECONDS)
-  }
+  override def start(): Unit = { }
 
   override def enqueue(alterIsrItem: AlterIsrItem): Boolean = {
-    unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null
+    if (unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null) {
+      if (inflightRequest.compareAndSet(false, true)) {
+        // optimistically set the inflight flag even though we haven't sent the request yet
+        scheduler.schedule("send-alter-isr", propagateIsrChanges, 50, -1, TimeUnit.MILLISECONDS)

Review comment:
       The 50ms delay here acts like a simplistic linger time to allow ISR updates to batch. If we schedule the thread immediately after the first call to `enqueue`, we'll likely only send a single partition in the AlterIsr request. With the small delay, we give partitions a chance to accumulate for sending as one batch.
   
   We might consider adding a real linger time with maximum delay if we find that we're sending inefficient batches in practice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #9749: Only schedule AlterIsr thread when we have an ISR change

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #9749:
URL: https://github.com/apache/kafka/pull/9749


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org