You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/12 20:19:17 UTC

[kafka] branch 3.3 updated: KAFKA-14392: Fix overly long request timeouts in BrokerToControllerChannelManager (#12856)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 22af3f29ced KAFKA-14392: Fix overly long request timeouts in BrokerToControllerChannelManager (#12856)
22af3f29ced is described below

commit 22af3f29ced9e969c254004f592c06c8a45b84cc
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Dec 9 20:00:19 2022 -0500

    KAFKA-14392: Fix overly long request timeouts in BrokerToControllerChannelManager (#12856)
    
    In BrokerToControllerChannelManager, set the request timeout to the minimum of the retry timeout
    and the controller socket timeout. This fixes some cases where we were unintentionally setting an
    overly long request timeout.
    
    Also, the channel manager used by the BrokerLifecycleManager should set a retry timeout equal to
    half of the broker session timeout, rather than the entire broker session timeout, to allow for a
    retransmission if the initial attempt fails.
    
    These two fixes should address some cases where heartbeat broker requests were not being resent
    in a timely fashion after a network glitch.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, José Armando García Sancio <js...@apache.org>
---
 core/src/main/scala/kafka/server/BrokerServer.scala                   | 2 +-
 .../main/scala/kafka/server/BrokerToControllerChannelManager.scala    | 4 ++--
 core/src/main/scala/kafka/server/ControllerApis.scala                 | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1008decadb1..32034cb1a4f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -344,7 +344,7 @@ class BrokerServer(
         config,
         "heartbeat",
         threadNamePrefix,
-        config.brokerSessionTimeoutMs.toLong
+        config.brokerSessionTimeoutMs / 2 // KAFKA-14392
       )
       lifecycleManager.start(
         () => metadataListener.highestMetadataOffset,
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 99e86722f2f..92754a793f5 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -211,7 +211,7 @@ class BrokerToControllerChannelManagerImpl(
         50,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        config.requestTimeoutMs,
+        Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, // request timeout should not exceed the provided retry timeout
         config.connectionSetupTimeoutMs,
         config.connectionSetupTimeoutMaxMs,
         time,
@@ -283,7 +283,7 @@ class BrokerToControllerRequestThread(
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, networkClient, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) {
 
   private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 657c2965533..1c3586263a7 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -576,7 +576,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
-      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
+      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs / 2))
     controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerHeartbeatReply,