You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/04/16 16:14:40 UTC

[kafka] branch trunk updated: KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (#8448)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a36d9f  KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (#8448)
9a36d9f is described below

commit 9a36d9f913e3474a3c4c83e91699759c5fa848a2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Apr 16 18:14:02 2020 +0200

    KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (#8448)
    
    This commit reworks the SocketServer to always start the acceptor threads after the processor threads and to always stop the acceptor threads before the processor threads. It ensures that the acceptor shutdown is not blocked waiting on the processors to be fully shutdown by decoupling the shutdown signal and the awaiting. It also ensure that the processor threads drain its newConnection queue to unblock acceptors that may be waiting. However, the acceptors still bind during the startu [...]
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../main/scala/kafka/network/SocketServer.scala    | 194 +++++++++++++--------
 core/src/main/scala/kafka/server/KafkaServer.scala |   6 +-
 .../unit/kafka/network/SocketServerTest.scala      |  42 ++++-
 3 files changed, 164 insertions(+), 78 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index aebb436..77c47f0 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -100,29 +100,28 @@ class SocketServer(val config: KafkaConfig,
 
   private var nextProcessorId = 0
   private var connectionQuotas: ConnectionQuotas = _
+  private var startedProcessingRequests = false
   private var stoppedProcessingRequests = false
 
   /**
-   * Start the socket server. Acceptors for all the listeners are started. Processors
-   * are started if `startupProcessors` is true. If not, processors are only started when
-   * [[kafka.network.SocketServer#startDataPlaneProcessors()]] or
-   * [[kafka.network.SocketServer#startControlPlaneProcessor()]] is invoked. Delayed starting of processors
-   * is used to delay processing client connections until server is fully initialized, e.g.
-   * to ensure that all credentials have been loaded before authentications are performed.
-   * Acceptors are always started during `startup` so that the bound port is known when this
-   * method completes even when ephemeral ports are used. Incoming connections on this server
-   * are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
+   * Starts the socket server and creates all the Acceptors and the Processors. The Acceptors
+   * start listening at this stage so that the bound port is known when this method completes
+   * even when ephemeral ports are used. Acceptors and Processors are started if `startProcessingRequests`
+   * is true. If not, acceptors and processors are only started when [[kafka.network.SocketServer#startProcessingRequests()]]
+   * is invoked. Delayed starting of acceptors and processors is used to delay processing client
+   * connections until server is fully initialized, e.g. to ensure that all credentials have been
+   * loaded before authentications are performed. Incoming connections on this server are processed
+   * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
    *
-   * @param startupProcessors Flag indicating whether `Processor`s must be started.
+   * @param startProcessingRequests Flag indicating whether `Processor`s must be started.
    */
-  def startup(startupProcessors: Boolean = true): Unit = {
+  def startup(startProcessingRequests: Boolean = true): Unit = {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(config, time)
       createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
       createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
-      if (startupProcessors) {
-        startControlPlaneProcessor(Map.empty)
-        startDataPlaneProcessors(Map.empty)
+      if (startProcessingRequests) {
+        this.startProcessingRequests()
       }
     }
 
@@ -160,22 +159,67 @@ class SocketServer(val config: KafkaConfig,
         Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
       }.getOrElse(0.0)
     })
-    info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane")
-    if (controlPlaneAcceptorOpt.isDefined)
-      info("Started control-plane acceptor thread")
   }
 
   /**
-   * Starts processors of all the data-plane acceptors of this server if they have not already been started.
-   * This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]]
-   * was invoked with `startupProcessors=false`.
+   * Start processing requests and new connections. This method is used for delayed starting of
+   * all the acceptors and processors if [[kafka.network.SocketServer#startup]] was invoked with
+   * `startProcessingRequests=false`.
    *
    * Before starting processors for each endpoint, we ensure that authorizer has all the metadata
-   * to authorize requests on that endpoint by waiting on the provided future. We start inter-broker listener
-   * before other listeners. This allows authorization metadata for other listeners to be stored in Kafka topics
-   * in this cluster.
+   * to authorize requests on that endpoint by waiting on the provided future. We start inter-broker
+   * listener before other listeners. This allows authorization metadata for other listeners to be
+   * stored in Kafka topics in this cluster.
+   *
+   * @param authorizerFutures Future per [[EndPoint]] used to wait before starting the processor
+   *                          corresponding to the [[EndPoint]]
    */
-  def startDataPlaneProcessors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized {
+  def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
+    info("Starting socket server acceptors and processors")
+    this.synchronized {
+      if (!startedProcessingRequests) {
+        startControlPlaneProcessorAndAcceptor(authorizerFutures)
+        startDataPlaneProcessorsAndAcceptors(authorizerFutures)
+        startedProcessingRequests = true
+      } else {
+        info("Socket server acceptors and processors already started")
+      }
+    }
+    info("Started socket server acceptors and processors")
+  }
+
+  /**
+   * Starts processors of the provided acceptor and the acceptor itself.
+   *
+   * Before starting them, we ensure that authorizer has all the metadata to authorize
+   * requests on that endpoint by waiting on the provided future.
+   */
+  private def startAcceptorAndProcessors(threadPrefix: String,
+                                         endpoint: EndPoint,
+                                         acceptor: Acceptor,
+                                         authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
+    debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
+    waitForAuthorizerFuture(acceptor, authorizerFutures)
+    debug(s"Start processors on listener ${endpoint.listenerName}")
+    acceptor.startProcessors(threadPrefix)
+    debug(s"Start acceptor thread on listener ${endpoint.listenerName}")
+    if (!acceptor.isStarted()) {
+      KafkaThread.nonDaemon(
+        s"${threadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}",
+        acceptor
+      ).start()
+      acceptor.awaitStartup()
+    }
+    info(s"Started $threadPrefix acceptor and processor(s) for endpoint : ${endpoint.listenerName}")
+  }
+
+  /**
+   * Starts processors of all the data-plane acceptors and all the acceptors of this server.
+   *
+   * We start inter-broker listener before other listeners. This allows authorization metadata for
+   * other listeners to be stored in Kafka topics in this cluster.
+   */
+  private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
     val interBrokerListener = dataPlaneAcceptors.asScala.keySet
       .find(_.listenerName == config.interBrokerListenerName)
       .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
@@ -183,43 +227,34 @@ class SocketServer(val config: KafkaConfig,
       dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
     orderedAcceptors.foreach { acceptor =>
       val endpoint = acceptor.endPoint
-      debug(s"Wait for authorizer to complete start up on listener ${endpoint.listenerName}")
-      waitForAuthorizerFuture(acceptor, authorizerFutures)
-      debug(s"Start processors on listener ${endpoint.listenerName}")
-      acceptor.startProcessors(DataPlaneThreadPrefix)
+      startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
     }
-    info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
   }
 
   /**
-   * Start the processor of control-plane acceptor of this server if it has not already been started.
-   * This method is used for delayed starting of control-plane processor if [[kafka.network.SocketServer#startup]]
-   * was invoked with `startupProcessors=false`.
+   * Start the processor of control-plane acceptor and the acceptor of this server.
    */
-  def startControlPlaneProcessor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = synchronized {
+  private def startControlPlaneProcessorAndAcceptor(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
     controlPlaneAcceptorOpt.foreach { controlPlaneAcceptor =>
-      waitForAuthorizerFuture(controlPlaneAcceptor, authorizerFutures)
-      controlPlaneAcceptor.startProcessors(ControlPlaneThreadPrefix)
-      info(s"Started control-plane processor for the control-plane acceptor")
+      val endpoint = config.controlPlaneListener.get
+      startAcceptorAndProcessors(ControlPlaneThreadPrefix, endpoint, controlPlaneAcceptor, authorizerFutures)
     }
   }
 
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
   private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
-                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
+                                                    endpoints: Seq[EndPoint]): Unit = {
     endpoints.foreach { endpoint =>
       connectionQuotas.addListener(config, endpoint.listenerName)
       val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
       addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
-      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
-      dataPlaneAcceptor.awaitStartup()
       dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
-      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
+      info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
     }
   }
 
-  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
+  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
     endpointOpt.foreach { endpoint =>
       connectionQuotas.addListener(config, endpoint.listenerName)
       val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
@@ -231,20 +266,18 @@ class SocketServer(val config: KafkaConfig,
       controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
       nextProcessorId += 1
       controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
-      KafkaThread.nonDaemon(s"${ControlPlaneThreadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start()
-      controlPlaneAcceptor.awaitStartup()
-      info(s"Created control-plane acceptor and processor for endpoint : $endpoint")
+      info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
     }
   }
 
-  private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = synchronized {
+  private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = {
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
     new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix)
   }
 
-  private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+  private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
     val listenerName = endpoint.listenerName
     val securityProtocol = endpoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
@@ -261,13 +294,13 @@ class SocketServer(val config: KafkaConfig,
   /**
    * Stop processing requests and new connections.
    */
-  def stopProcessingRequests() = {
+  def stopProcessingRequests(): Unit = {
     info("Stopping socket server request processors")
     this.synchronized {
-      dataPlaneAcceptors.asScala.values.foreach(_.shutdown())
-      controlPlaneAcceptorOpt.foreach(_.shutdown())
-      dataPlaneProcessors.asScala.values.foreach(_.shutdown())
-      controlPlaneProcessorOpt.foreach(_.shutdown())
+      dataPlaneAcceptors.asScala.values.foreach(_.initiateShutdown())
+      dataPlaneAcceptors.asScala.values.foreach(_.awaitShutdown())
+      controlPlaneAcceptorOpt.foreach(_.initiateShutdown())
+      controlPlaneAcceptorOpt.foreach(_.awaitShutdown())
       dataPlaneRequestChannel.clear()
       controlPlaneRequestChannelOpt.foreach(_.clear())
       stoppedProcessingRequests = true
@@ -289,7 +322,7 @@ class SocketServer(val config: KafkaConfig,
    * Shutdown the socket server. If still processing requests, shutdown
    * acceptors and processors first.
    */
-  def shutdown() = {
+  def shutdown(): Unit = {
     info("Shutting down socket server")
     this.synchronized {
       if (!stoppedProcessingRequests)
@@ -317,14 +350,20 @@ class SocketServer(val config: KafkaConfig,
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
     info(s"Adding data-plane listeners for endpoints $listenersAdded")
     createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, listenersAdded)
-    startDataPlaneProcessors()
+    listenersAdded.foreach { endpoint =>
+      val acceptor = dataPlaneAcceptors.get(endpoint)
+      startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor)
+    }
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
     info(s"Removing data-plane listeners for endpoints $listenersRemoved")
     listenersRemoved.foreach { endpoint =>
       connectionQuotas.removeListener(config, endpoint.listenerName)
-      dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
+      dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
+        acceptor.initiateShutdown()
+        acceptor.awaitShutdown()
+      }
     }
   }
 
@@ -422,15 +461,24 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
   def wakeup(): Unit
 
   /**
-   * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
+   * Initiates a graceful shutdown by signaling to stop
    */
-  def shutdown(): Unit = {
+  def initiateShutdown(): Unit = {
     if (alive.getAndSet(false))
       wakeup()
-    shutdownLatch.await()
   }
 
   /**
+   * Wait for the thread to completely shutdown
+   */
+  def awaitShutdown(): Unit = shutdownLatch.await
+
+  /**
+   * Returns true if the thread is completely started
+   */
+  def isStarted(): Boolean = startupLatch.getCount == 0
+
+  /**
    * Wait for the thread to completely start up
    */
   def awaitStartup(): Unit = startupLatch.await
@@ -498,8 +546,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors.foreach { processor =>
-      KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
-        processor).start()
+      KafkaThread.nonDaemon(
+        s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+        processor
+      ).start()
     }
   }
 
@@ -509,14 +559,22 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     // The processors are then removed from `requestChannel` and any pending responses to these processors are dropped.
     val toRemove = processors.takeRight(removeCount)
     processors.remove(processors.size - removeCount, removeCount)
-    toRemove.foreach(_.shutdown())
+    toRemove.foreach(_.initiateShutdown())
+    toRemove.foreach(_.awaitShutdown())
     toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
   }
 
-  override def shutdown(): Unit = {
-    super.shutdown()
+  override def initiateShutdown(): Unit = {
+    super.initiateShutdown()
     synchronized {
-      processors.foreach(_.shutdown())
+      processors.foreach(_.initiateShutdown())
+    }
+  }
+
+  override def awaitShutdown(): Unit = {
+    super.awaitShutdown()
+    synchronized {
+      processors.foreach(_.awaitShutdown())
     }
   }
 
@@ -530,7 +588,6 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
       var currentProcessorIndex = 0
       while (isRunning) {
         try {
-
           val ready = nioSelector.select(500)
           if (ready > 0) {
             val keys = nioSelector.selectedKeys()
@@ -542,7 +599,6 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
                 if (key.isAcceptable) {
                   accept(key).foreach { socketChannel =>
-
                     // Assign the channel to the next processor (using round-robin) to which the
                     // channel can be added without blocking. If newConnections queue is full on
                     // all processors, block until the last one is able to accept a connection.
@@ -1038,6 +1094,9 @@ private[kafka] class Processor(val id: Int,
    * Close the selector and all open connections
    */
   private def closeAll(): Unit = {
+    while (!newConnections.isEmpty) {
+      newConnections.poll().close()
+    }
     selector.channels.asScala.foreach { channel =>
       close(channel.id)
     }
@@ -1097,12 +1156,11 @@ private[kafka] class Processor(val id: Int,
    */
   override def wakeup() = selector.wakeup()
 
-  override def shutdown(): Unit = {
-    super.shutdown()
+  override def initiateShutdown(): Unit = {
+    super.initiateShutdown()
     removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
     metrics.removeMetric(expiredConnectionsKilledCountMetricName)
   }
-
 }
 
 class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 8a829b4..f40be9e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -270,7 +270,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Delay starting processors until the end of the initialization sequence to ensure
         // that credentials have been loaded before processing authentications.
         socketServer = new SocketServer(config, metrics, time, credentialProvider)
-        socketServer.startup(startupProcessors = false)
+        socketServer.startup(startProcessingRequests = false)
 
         /* start replica manager */
         replicaManager = createReplicaManager(isShuttingDown)
@@ -352,8 +352,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-        socketServer.startControlPlaneProcessor(authorizerFutures)
-        socketServer.startDataPlaneProcessors(authorizerFutures)
+        socketServer.startProcessingRequests(authorizerFutures)
+
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 57ecf28..d24ee2e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -302,7 +302,7 @@ class SocketServerTest {
     testProps.put("inter.broker.listener.name", "INTERNAL")
     val config = KafkaConfig.fromProps(testProps)
     val testableServer = new TestableSocketServer(config)
-    testableServer.startup(startupProcessors = false)
+    testableServer.startup(startProcessingRequests = false)
     val updatedEndPoints = config.advertisedListeners.map { endpoint =>
       endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
     }.map(_.toJava)
@@ -310,6 +310,16 @@ class SocketServerTest {
     val externalReadyFuture = new CompletableFuture[Void]()
     val executor = Executors.newSingleThreadExecutor()
 
+    def controlPlaneListenerStarted() = {
+      try {
+        val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost)
+        sendAndReceiveControllerRequest(socket, testableServer)
+        true
+      } catch {
+        case _: Throwable => false
+      }
+    }
+
     def listenerStarted(listenerName: ListenerName) = {
       try {
         val socket = connect(testableServer, listenerName, localAddr = InetAddress.getLocalHost)
@@ -321,14 +331,11 @@ class SocketServerTest {
     }
 
     try {
-      testableServer.startControlPlaneProcessor()
-      val socket1 = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost)
-      sendAndReceiveControllerRequest(socket1, testableServer)
-
       val externalListener = new ListenerName("EXTERNAL")
       val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == externalListener.value).get
-      val futures =  Map(externalEndpoint -> externalReadyFuture)
-      val startFuture = executor.submit((() => testableServer.startDataPlaneProcessors(futures)): Runnable)
+      val futures = Map(externalEndpoint -> externalReadyFuture)
+      val startFuture = executor.submit((() => testableServer.startProcessingRequests(futures)): Runnable)
+      TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control plane listener not started")
       TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started")
       assertFalse("Socket server startup did not wait for future to complete", startFuture.isDone)
 
@@ -343,6 +350,27 @@ class SocketServerTest {
   }
 
   @Test
+  def testStagedListenerShutdownWhenConnectionQueueIsFull(): Unit = {
+    val testProps = new Properties
+    testProps ++= props
+    testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
+    testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROLLER")
+    testProps.put("inter.broker.listener.name", "INTERNAL")
+    val config = KafkaConfig.fromProps(testProps)
+    val connectionQueueSize = 1
+    val testableServer = new TestableSocketServer(config, connectionQueueSize)
+    testableServer.startup(startProcessingRequests = false)
+
+    val socket1 = connect(testableServer, new ListenerName("EXTERNAL"), localAddr = InetAddress.getLocalHost)
+    sendRequest(socket1, producerRequestBytes())
+    val socket2 = connect(testableServer, new ListenerName("EXTERNAL"), localAddr = InetAddress.getLocalHost)
+    sendRequest(socket2, producerRequestBytes())
+
+    testableServer.shutdown()
+  }
+
+  @Test
   def testControlPlaneRequest(): Unit = {
     val testProps = new Properties
     testProps ++= props