You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/21 05:40:19 UTC
[kafka] branch trunk updated: MINOR: Remove acceptor creation in
network thread update code (#4742)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2f90cb8 MINOR: Remove acceptor creation in network thread update code (#4742)
2f90cb8 is described below
commit 2f90cb86c18e854fd687ea7bbeb8692178bbb02d
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 21 05:40:16 2018 +0000
MINOR: Remove acceptor creation in network thread update code (#4742)
Fix dynamic addition of network threads to only create new Processor threads and not the Acceptor.
---
.../main/scala/kafka/network/SocketServer.scala | 42 +++++++++++++---------
.../server/DynamicBrokerReconfigurationTest.scala | 5 ++-
2 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 71491f2..e4fdb08 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -81,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
- createProcessors(config.numNetworkThreads, config.listeners)
+ createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
}
newGauge("NetworkProcessorAvgIdlePercent",
@@ -112,8 +112,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
- private def createProcessors(newProcessorsPerListener: Int,
- endpoints: Seq[EndPoint]): Unit = synchronized {
+ private def createAcceptorAndProcessors(processorsPerListener: Int,
+ endpoints: Seq[EndPoint]): Unit = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
@@ -122,24 +122,30 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
- val listenerProcessors = new ArrayBuffer[Processor]()
-
- for (_ <- 0 until newProcessorsPerListener) {
- val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
- listenerProcessors += processor
- requestChannel.addProcessor(processor)
- nextProcessorId += 1
- }
- listenerProcessors.foreach(p => processors.put(p.id, p))
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
- acceptor.addProcessors(listenerProcessors)
+ addProcessors(acceptor, endpoint, processorsPerListener)
}
}
+ private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+ val listenerName = endpoint.listenerName
+ val securityProtocol = endpoint.securityProtocol
+ val listenerProcessors = new ArrayBuffer[Processor]()
+
+ for (_ <- 0 until newProcessorsPerListener) {
+ val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+ listenerProcessors += processor
+ requestChannel.addProcessor(processor)
+ nextProcessorId += 1
+ }
+ listenerProcessors.foreach(p => processors.put(p.id, p))
+ acceptor.addProcessors(listenerProcessors)
+ }
+
/**
* Stop processing requests and new connections.
*/
@@ -156,9 +162,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
- if (newNumNetworkThreads > oldNumNetworkThreads)
- createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
- else if (newNumNetworkThreads < oldNumNetworkThreads)
+ if (newNumNetworkThreads > oldNumNetworkThreads) {
+ acceptors.asScala.foreach { case (endpoint, acceptor) =>
+ addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
+ }
+ } else if (newNumNetworkThreads < oldNumNetworkThreads)
acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
}
@@ -187,7 +195,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
info(s"Adding listeners for endpoints $listenersAdded")
- createProcessors(config.numNetworkThreads, listenersAdded)
+ createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
}
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f0bd61a..833a98e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -420,7 +420,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int = {
- val newSize = currentSize * 2 - 1
+ val newSize = if (currentSize == 1) currentSize * 2 else currentSize * 2 - 1
resizeThreadPool(propName, newSize, threadPrefix)
newSize
}
@@ -444,6 +444,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
Thread.sleep(100)
}
stopAndVerifyProduceConsume(producerThread, consumerThread, mayReceiveDuplicates)
+ // Verify that all threads are alive
+ maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix)
}
val config = servers.head.config
@@ -457,6 +459,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
"", mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
networkThreadPrefix, mayReceiveDuplicates = true)
+ verifyThreads("kafka-socket-acceptor-", config.listeners.size)
verifyProcessorMetrics()
verifyMarkPartitionsForTruncation()
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.