You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/05 07:54:08 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9564: KAFKA-10667: add timeout for forwarding requests

abbccdda opened a new pull request #9564:
URL: https://github.com/apache/kafka/pull/9564


   Right now the forwarding request will retry indefinitely, which is not the ideal behavior. We should timeout the enqueued request when it hits the request timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r534281054



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       We also might want to iterate through the in-flight ISR (`inflightAlterIsrItems`) and complete their callbacks with an error. Otherwise, these partitions will not know to retry their ISR changes. They will eventually get new metadata via LeaderAndIsr, but it would be good to prevent them from getting into a bad state.
   
   Something like
   ```scala
   inflightAlterIsrItems.foreach { item => item.callback.apply(Left(Errors.REQUEST_TIMED_OUT)) }
   ```
   
   I don't _think_ we need this in the `finally` block at L104 since `handleAlterIsrResponse` should be pretty robust against raising an exception

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       Where will this exception get raised? I'm guessing the BrokerToControllerChannelManager thread?
   
   If we timeout and give up on reading a response, I think we should still clear the `inflightRequest` flag. Otherwise AlterIsrManager will be stuck in-flight and be unable to send any more requests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r517868504



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")
+    } else if (response.wasDisconnected()) {

Review comment:
       (The following question is not related to this PR. just curios)
   
   Is it possible to encounter authentication error when forwarding? If so, is it necessary to avoid retrying such (error) request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525764228



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +183,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  private def isTimedOut(response: ClientResponse): Boolean = {
+    val requestCreatedTime = response.receivedTimeMs() - response.requestLatencyMs()

Review comment:
       Is it legitimate to compare with requestTimeout here since we actually measure the request buffered time on the broker-to-controller channel queue? Should we introduce a new timeout config here somehow?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533814137



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       That's what I decide to do eventually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533091229



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       Not sure we could do the callback here, since the request failed already. Maybe just do nothing here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533627543



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       Why not raise an exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537788538



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -34,8 +34,12 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 trait BrokerToControllerChannelManager {
+
+  // The retry deadline will only be checked after receiving a response. This means that in the worst case,

Review comment:
       nit: turn into scaladoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r518208011



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")
+    } else if (response.wasDisconnected()) {

Review comment:
       Thanks, for authorization related exception, we could add the logic to stop retry as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r517868504



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")
+    } else if (response.wasDisconnected()) {

Review comment:
       (The following question is not related to this PR. just curious)
   
   Is it possible to encounter authentication error when forwarding? If so, is it necessary to avoid retrying such (error) request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525763121



##########
File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##########
@@ -24,4 +24,11 @@
 
     void onComplete(ClientResponse response);
 
+    /**
+     * Fire when the request transmission hits a fatal exception.
+     *
+     * @param exception the thrown exception
+     */
+    default void onFailure(RuntimeException exception) {

Review comment:
       I see, if this is the case, we need a customized completion handler for both forwarding and AlterISR IMHO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r527324620



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onFailure(exception: RuntimeException): Unit = {
+        error(s"Encountered $exception when sending AlterIsr to the controller, clearing all pending states")
+        unsentIsrUpdates.clear()

Review comment:
       There may be requests that never had a chance to be sent?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +194,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {
+    (time.milliseconds() - request.firstEnqueuedTimeMs) > clientRequestTimeout

Review comment:
       We do not want to timeout requests from the broker such as `AlterIsr`. We only want this for client requests.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onFailure(exception: RuntimeException): Unit = {

Review comment:
       Don't we need to invoke the callbacks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525598620



##########
File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##########
@@ -24,4 +24,11 @@
 
     void onComplete(ClientResponse response);
 
+    /**
+     * Fire when the request transmission hits a fatal exception.
+     *
+     * @param exception the thrown exception
+     */
+    default void onFailure(RuntimeException exception) {

Review comment:
       Here is the interface for `KafkaClient`:
   ```
    ClientRequest newClientRequest(String nodeId,
                                      AbstractRequest.Builder<?> requestBuilder,
                                      long createdTimeMs,
                                      boolean expectResponse,
                                      int requestTimeoutMs,
                                      RequestCompletionHandler callback);
   ```
   It is misleading to add an `onFailure` callback to `RequestCompletionHandler` if it is not going to be used by `KafkaClient` implementations such as `NetworkClient`. The usage in `ConsumerNetworkClient` is different because it is internal. In general, we should avoid leaking implementation details up to the interfaces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r527359174



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onFailure(exception: RuntimeException): Unit = {
+        error(s"Encountered $exception when sending AlterIsr to the controller, clearing all pending states")
+        unsentIsrUpdates.clear()

Review comment:
       Yea, I'm not sure whether we should clear all the pending updates here or not, just saw the comment as `Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates`, so thinking maybe we could just remove all pending updates here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525428214



##########
File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##########
@@ -24,4 +24,11 @@
 
     void onComplete(ClientResponse response);
 
+    /**
+     * Fire when the request transmission hits a fatal exception.
+     *
+     * @param exception the thrown exception
+     */
+    default void onFailure(RuntimeException exception) {

Review comment:
       I am not so sure about adding this to `RequestCompletionHandler`. This interface is mainly serving `NetworkClient`, but there are no changes to `NetworkClient` here.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +183,11 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  private def isTimedOut(response: ClientResponse): Boolean = {
+    val requestCreatedTime = response.receivedTimeMs() - response.requestLatencyMs()

Review comment:
       The time we need to track should begin when the request is submitted to the manager. This looks like it is just checking how long an individual request is taking, which the request timeout in `NetworkClient` already takes care of.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9564:
URL: https://github.com/apache/kafka/pull/9564


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r523184916



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")

Review comment:
       I think the `failExpiredRequests` in the InterBrokerSendThread level shall be fired and reach the callback.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r523184916



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")

Review comment:
       Yea, I think we could actually fire the callback here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r523183433



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {

Review comment:
       I think so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r534352532



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       We set the timeout of AlterIsrManager to infinity, so it should never happen. See comment from Jason: https://github.com/apache/kafka/pull/9564#discussion_r527326231




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533024276



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._
 
 trait BrokerToControllerChannelManager {
   def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                  callback: RequestCompletionHandler): Unit
+                  callback: BrokerToControllerRequestCompletionHandler,
+                  requestTimeout: Long): Unit

Review comment:
       Perhaps we could use a name like `retryTimeout` to distinguish this from the request timeout which only applies to individual requests. Alternatively we could let the caller provide the retry deadline explicitly. This would save the need for the extra `time.milliseconds` call.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {

Review comment:
       nit: `hasTimedOut`?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: BrokerToControllerRequestCompletionHandler,
+                           requestTimeout: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout))

Review comment:
       Won't this overflow with `requestTimeout` set to `Long.MaxValue`. Do we have any test cases?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {
+    time.milliseconds() > request.deadlineMs

Review comment:
       Maybe we can avoid this call to `time.milliseconds` and use `ClientResponse.receivedTimeMs`?

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -165,7 +176,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(request)) {

Review comment:
       We check for timeouts only after receiving a response. I guess this means that in the worst case, the total timeout would be request.timeout*2. This is probably not a big deal, but maybe worth documenting in a comment somewhere. 

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -44,26 +46,34 @@ class ForwardingManager(channelManager: BrokerToControllerChannelManager) extend
       request.context.clientAddress.getAddress
     )
 
-    def onClientResponse(clientResponse: ClientResponse): Unit = {
-      val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
-      val envelopeError = envelopeResponse.error()
-      val requestBody = request.body[AbstractRequest]
+    class ForwardingResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(clientResponse: ClientResponse): Unit = {
+        val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
+        val envelopeError = envelopeResponse.error()
+        val requestBody = request.body[AbstractRequest]
 
-      val response = if (envelopeError != Errors.NONE) {
-        // An envelope error indicates broker misconfiguration (e.g. the principal serde
-        // might not be defined on the receiving broker). In this case, we do not return
-        // the error directly to the client since it would not be expected. Instead we
-        // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
-        // on the broker.
-        debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
-        requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
-      } else {
-        parseResponse(envelopeResponse.responseData, requestBody, request.header)
+        val response = if (envelopeError != Errors.NONE) {
+          // An envelope error indicates broker misconfiguration (e.g. the principal serde
+          // might not be defined on the receiving broker). In this case, we do not return
+          // the error directly to the client since it would not be expected. Instead we
+          // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
+          // on the broker.
+          debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
+          requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+        } else {
+          parseResponse(envelopeResponse.responseData, requestBody, request.header)
+        }
+        responseCallback(response)
+      }
+
+      override def onTimeout(): Unit = {
+        error(s"Forwarding of the request $request failed due to timeout exception")

Review comment:
       I think this should be debug. Users will already have visibility into the error through the request log and the error metrics. There's probably a stronger case to increase the level for the unknown error case in `onComplete` above, but I'm fine letting them both be debug.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       If we don't expect this, maybe we should just raise an exception. Alternatively if we are not going to fail, perhaps we should go ahead and invoke the callbacks.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: BrokerToControllerRequestCompletionHandler,
+                           requestTimeout: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout))
     requestThread.wakeup()
   }
 
 }
 
+abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler {
+
+  /**
+   * Fire when the request transmission hits a fatal exception.

Review comment:
       This comment needs to be updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533091766



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: BrokerToControllerRequestCompletionHandler,
+                           requestTimeout: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout))

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r525507232



##########
File path: clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java
##########
@@ -24,4 +24,11 @@
 
     void onComplete(ClientResponse response);
 
+    /**
+     * Fire when the request transmission hits a fatal exception.
+     *
+     * @param exception the thrown exception
+     */
+    default void onFailure(RuntimeException exception) {

Review comment:
       We do have a case in `ConsumerNetworkClient` which adds an `onFailure` callback. To me it makes sense to include it as part of the RequestCompletionHandler interface. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537709868



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)

Review comment:
       nit: drop parenthesis after `brokerEpoch`

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,24 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: BrokerToControllerRequestCompletionHandler,
+                           retryDeadlineMs: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, retryDeadlineMs))
     requestThread.wakeup()
   }
+}
+
+abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler {

Review comment:
       How about `ControllerRequestCompletionHandler`?

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")
+      }
     }
 
     debug(s"Sending AlterIsr to controller $message")
-    controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), responseHandler)
+    // We will not timeout AlterISR request, instead letting it retry indefinitely.

Review comment:
       Perhaps add some more detail: "... letting it retry indefinitely until a response is received or the request is cancelled after receiving new `LeaderAndIsr` state".

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,24 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: BrokerToControllerRequestCompletionHandler,
+                           retryDeadlineMs: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, retryDeadlineMs))
     requestThread.wakeup()
   }
+}
+
+abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler {
 
+  /**
+   * Fire when the request transmission hits timeout.

Review comment:
       Can we document the difference between this and the request timeout? 

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +190,12 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
     }
   }
 
+  // The timeout will only be checked after receiving a response. This means that in the worst case,

Review comment:
       Can we move this comment to the doc for `sendRequest`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#issuecomment-740244739


   Hit unrelated stream test failure, retest


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537948141



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,34 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends ControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")
+      }
     }
 
     debug(s"Sending AlterIsr to controller $message")
-    controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), responseHandler)
+    // We will not timeout AlterISR request, instead letting it retry indefinitely
+    // until a response is received or the request is cancelled after receiving new LeaderAndIsr state.

Review comment:
       I don't think we actually have the ability to cancel in-flight requests. The problem is that many partitions can have ISR changes batched together in a single request. So we couldn't cancel one without cancelling them all. 
   
   A new LeaderAndIsr arriving will just overwrite the existing `isrState` in Partition which will effectively cause any in-flight AlterIsr response to be ignored (for that partition)

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,34 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends ControllerRequestCompletionHandler {

Review comment:
       nit: could we inline this as an anonymous class down below rather than defining it here? Just seems a little odd to define it like this and then calling in to the containing class for `inflightRequest`. 

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")

Review comment:
       Thanks, makes sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r527359444



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onFailure(exception: RuntimeException): Unit = {

Review comment:
       You mean `handleAlterIsrResponse`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537786674



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -208,7 +209,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient,
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(response)) {
+      debug(s"The request failed to send to the controller after timeout ${clientRequestTimeout} ms: $response")
+    } else if (response.wasDisconnected()) {

Review comment:
       Note that we handle authentication exceptions in `InterBrokerSendThread`. We log a message and mark the response as "disconnected." I think it is reasonable to keep retrying in case the broker's configuration changes and so that the error messages will remain persistent in the logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537789168



##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +129,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: ControllerRequestCompletionHandler,
+                           retryDeadlineMs: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, retryDeadlineMs))
     requestThread.wakeup()
   }
+}
+
+abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler {
 
+  /**
+   * Fire when the request transmission time passes the caller defined deadline on the channel queue.
+   * This is different from the original request's timeout.

Review comment:
       Can we explain how it is different? We want to emphasize that it covers the total time including retries which might be the result of request timeouts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r537958278



##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,34 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends ControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        throw new IllegalStateException("Encountered unexpected timeout when sending AlterIsr to the controller")
+      }
     }
 
     debug(s"Sending AlterIsr to controller $message")
-    controllerChannelManager.sendRequest(new AlterIsrRequest.Builder(message), responseHandler)
+    // We will not timeout AlterISR request, instead letting it retry indefinitely
+    // until a response is received or the request is cancelled after receiving new LeaderAndIsr state.

Review comment:
       Updated the comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org