You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/21 05:40:19 UTC

[kafka] branch trunk updated: MINOR: Remove acceptor creation in network thread update code (#4742)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f90cb8  MINOR: Remove acceptor creation in network thread update code (#4742)
2f90cb8 is described below

commit 2f90cb86c18e854fd687ea7bbeb8692178bbb02d
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 21 05:40:16 2018 +0000

    MINOR: Remove acceptor creation in network thread update code (#4742)
    
    Fix dynamic addition of network threads to only create new Processor threads and not the Acceptor.
---
 .../main/scala/kafka/network/SocketServer.scala    | 42 +++++++++++++---------
 .../server/DynamicBrokerReconfigurationTest.scala  |  5 ++-
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 71491f2..e4fdb08 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -81,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def startup() {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
-      createProcessors(config.numNetworkThreads, config.listeners)
+      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
     }
 
     newGauge("NetworkProcessorAvgIdlePercent",
@@ -112,8 +112,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
-  private def createProcessors(newProcessorsPerListener: Int,
-                               endpoints: Seq[EndPoint]): Unit = synchronized {
+  private def createAcceptorAndProcessors(processorsPerListener: Int,
+                                          endpoints: Seq[EndPoint]): Unit = synchronized {
 
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
@@ -122,24 +122,30 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     endpoints.foreach { endpoint =>
       val listenerName = endpoint.listenerName
       val securityProtocol = endpoint.securityProtocol
-      val listenerProcessors = new ArrayBuffer[Processor]()
-
-      for (_ <- 0 until newProcessorsPerListener) {
-        val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
-        listenerProcessors += processor
-        requestChannel.addProcessor(processor)
-        nextProcessorId += 1
-      }
-      listenerProcessors.foreach(p => processors.put(p.id, p))
 
       val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
       KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
       acceptor.awaitStartup()
       acceptors.put(endpoint, acceptor)
-      acceptor.addProcessors(listenerProcessors)
+      addProcessors(acceptor, endpoint, processorsPerListener)
     }
   }
 
+  private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+    val listenerName = endpoint.listenerName
+    val securityProtocol = endpoint.securityProtocol
+    val listenerProcessors = new ArrayBuffer[Processor]()
+
+    for (_ <- 0 until newProcessorsPerListener) {
+      val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+      listenerProcessors += processor
+      requestChannel.addProcessor(processor)
+      nextProcessorId += 1
+    }
+    listenerProcessors.foreach(p => processors.put(p.id, p))
+    acceptor.addProcessors(listenerProcessors)
+  }
+
   /**
     * Stop processing requests and new connections.
     */
@@ -156,9 +162,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
     info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
-    if (newNumNetworkThreads > oldNumNetworkThreads)
-      createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
-    else if (newNumNetworkThreads < oldNumNetworkThreads)
+    if (newNumNetworkThreads > oldNumNetworkThreads) {
+      acceptors.asScala.foreach { case (endpoint, acceptor) =>
+        addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
+      }
+    } else if (newNumNetworkThreads < oldNumNetworkThreads)
       acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
   }
 
@@ -187,7 +195,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
     info(s"Adding listeners for endpoints $listenersAdded")
-    createProcessors(config.numNetworkThreads, listenersAdded)
+    createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f0bd61a..833a98e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -420,7 +420,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }
 
     def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int = {
-      val newSize = currentSize * 2 - 1
+      val newSize = if (currentSize == 1) currentSize * 2 else currentSize * 2 - 1
       resizeThreadPool(propName, newSize, threadPrefix)
       newSize
     }
@@ -444,6 +444,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
         Thread.sleep(100)
       }
       stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates)
+      // Verify that all threads are alive
+      maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix)
     }
 
     val config = servers.head.config
@@ -457,6 +459,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
+    verifyThreads("kafka-socket-acceptor-", config.listeners.size)
 
     verifyProcessorMetrics()
     verifyMarkPartitionsForTruncation()

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.