You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2021/12/28 10:11:00 UTC

[jira] [Commented] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

    [ https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17466055#comment-17466055 ] 

RivenSun commented on KAFKA-13437:
----------------------------------

Hi [~guozhang] , [~showuon]

can you give any suggestions?
Thanks.

> Broker parameter optimization: security.inter.broker.protocol and num.network.threads
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13437
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>
> h1. 1. security.inter.broker.protocol
> Firstly see this parameter comment
> {code:java}
> security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
> We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.
> The result is: broker startup failed
>  
> {code:java}
> [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
>         at scala.Predef$.require(Predef.scala:337)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
>         at kafka.Kafka$.buildServer(Kafka.scala:67)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
>  {code}
>  
>  
> h1. 2. num.network.threads
> The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
> for example:
> listenerNameA: used for communication between brokers
> listenerNameB: used to connect production messages and fetch messages on the client side
> listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and send control type requests, such as deleting topics or adding partitions, etc.
> So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced
>  
> h1. Rootcause:
> 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 
> KafkaConfig.scala
> {code:java}
> private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
>   Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
>     case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
>       throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
>         s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
>     case Some(name) =>
>       val listenerName = ListenerName.normalised(name)
>       val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
>         throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
>           s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
>       (listenerName, securityProtocol)
>     case None =>
>       val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
>         KafkaConfig.InterBrokerSecurityProtocolProp)
>       (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
>   }
> } {code}
> ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name
> 2. See "addDataPlaneProcessors" method in SocketServer.scala
> In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads
>  
> h1. Suggestion
>  # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
> Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
> If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
>  # The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
> num.network.threads is used as the default value
> *listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)