You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2021/11/09 07:34:00 UTC

[jira] [Created] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

RivenSun created KAFKA-13437:
--------------------------------

             Summary: Broker parameter optimization: security.inter.broker.protocol and num.network.threads
                 Key: KAFKA-13437
                 URL: https://issues.apache.org/jira/browse/KAFKA-13437
             Project: Kafka
          Issue Type: Improvement
          Components: core
            Reporter: RivenSun


h1. 1. security.inter.broker.protocol

Firstly see this parameter comment
{code:java}
security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.

The result is: broker startup failed

 
{code:java}
[2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
        at scala.Predef$.require(Predef.scala:337)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
        at kafka.Kafka$.buildServer(Kafka.scala:67)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)
 {code}
 

 
h1. 2. num.network.threads

The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
for example:
listenerNameA: used for communication between brokers
listenerNameB: used to connect production messages and fetch messages on the client side
listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and issue control type requests, such as deleting topics or adding partitions, etc.


So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced

 
h1. Rootcause:

1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 

KafkaConfig.scala
{code:java}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
  Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
    case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
      throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
        s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
    case Some(name) =>
      val listenerName = ListenerName.normalised(name)
      val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
        throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
          s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
      (listenerName, securityProtocol)
    case None =>
      val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
        KafkaConfig.InterBrokerSecurityProtocolProp)
      (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
  }
} {code}
ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name

2. See "addDataPlaneProcessors" method in SocketServer.scala

In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads

 
h1. Suggestion
 # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
 # The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
num.network.threads is used as the default value
*listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)