You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 15:58:57 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

mumrah commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r534281054



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       We also might want to iterate through the in-flight ISR (`inflightAlterIsrItems`) and complete their callbacks with an error. Otherwise, these partitions will not know to retry their ISR changes. They will eventually get new metadata via LeaderAndIsr, but it would be good to prevent them from getting into a bad state.
   
   Something like
   ```scala
   inflightAlterIsrItems.foreach { item => item.callback.apply(Left(Errors.REQUEST_TIMED_OUT)) }
   ```
   
   I don't _think_ we need this in the `finally` block at L104 since `handleAlterIsrResponse` should be pretty robust against raising an exception

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       Where will this exception get raised? I'm guessing the BrokerToControllerChannelManager thread?
   
   If we timeout and give up on reading a response, I think we should still clear the `inflightRequest` flag. Otherwise AlterIsrManager will be stuck in-flight and be unable to send any more requests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org