You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2021/11/09 07:34:00 UTC
[jira] [Created] (KAFKA-13437) Broker parameter optimization:
security.inter.broker.protocol and num.network.threads
RivenSun created KAFKA-13437:
--------------------------------
Summary: Broker parameter optimization: security.inter.broker.protocol and num.network.threads
Key: KAFKA-13437
URL: https://issues.apache.org/jira/browse/KAFKA-13437
Project: Kafka
Issue Type: Improvement
Components: core
Reporter: RivenSun
h1. 1. security.inter.broker.protocol
Firstly see this parameter comment
{code:java}
security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.
The result is: broker startup failed
{code:java}
[2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
at scala.Predef$.require(Predef.scala:337)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
at kafka.Kafka$.buildServer(Kafka.scala:67)
at kafka.Kafka$.main(Kafka.scala:87)
at kafka.Kafka.main(Kafka.scala)
{code}
h1. 2. num.network.threads
The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
for example:
listenerNameA: used for communication between brokers
listenerNameB: used to connect production messages and fetch messages on the client side
listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and issue control type requests, such as deleting topics or adding partitions, etc.
So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced
h1. Rootcause:
1. See "getInterBrokerListenerNameAndSecurityProtocol" method in
KafkaConfig.scala
{code:java}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
case Some(name) =>
val listenerName = ListenerName.normalised(name)
val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
(listenerName, securityProtocol)
case None =>
val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
KafkaConfig.InterBrokerSecurityProtocolProp)
(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
}
} {code}
ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name
2. See "addDataPlaneProcessors" method in SocketServer.scala
In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads
h1. Suggestion
# Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
# The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
num.network.threads is used as the default value
*listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)