You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2021/11/09 07:36:00 UTC

[jira] [Updated] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

     [ https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

RivenSun updated KAFKA-13437:
-----------------------------
    Description: 
h1. 1. security.inter.broker.protocol

Firstly see this parameter comment
{code:java}
security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.

The result is: broker startup failed

 
{code:java}
[2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
        at scala.Predef$.require(Predef.scala:337)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
        at kafka.Kafka$.buildServer(Kafka.scala:67)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)
 {code}
 

 
h1. 2. num.network.threads

The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
for example:
listenerNameA: used for communication between brokers
listenerNameB: used to connect production messages and fetch messages on the client side
listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and send control type requests, such as deleting topics or adding partitions, etc.

So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced

 
h1. Rootcause:

1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 

KafkaConfig.scala
{code:java}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
  Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
    case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
      throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
        s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
    case Some(name) =>
      val listenerName = ListenerName.normalised(name)
      val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
        throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
          s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
      (listenerName, securityProtocol)
    case None =>
      val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
        KafkaConfig.InterBrokerSecurityProtocolProp)
      (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
  }
} {code}
ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name

2. See "addDataPlaneProcessors" method in SocketServer.scala

In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads

 
h1. Suggestion
 # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
 # The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
num.network.threads is used as the default value
*listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.

  was:
h1. 1. security.inter.broker.protocol

Firstly see this parameter comment
{code:java}
security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.

The result is: broker startup failed

 
{code:java}
[2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
        at scala.Predef$.require(Predef.scala:337)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
        at kafka.Kafka$.buildServer(Kafka.scala:67)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)
 {code}
 

 
h1. 2. num.network.threads

The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
for example:
listenerNameA: used for communication between brokers
listenerNameB: used to connect production messages and fetch messages on the client side
listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and issue control type requests, such as deleting topics or adding partitions, etc.


So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced

 
h1. Rootcause:

1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 

KafkaConfig.scala
{code:java}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
  Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
    case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
      throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
        s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
    case Some(name) =>
      val listenerName = ListenerName.normalised(name)
      val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
        throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
          s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
      (listenerName, securityProtocol)
    case None =>
      val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
        KafkaConfig.InterBrokerSecurityProtocolProp)
      (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
  }
} {code}
ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name

2. See "addDataPlaneProcessors" method in SocketServer.scala

In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads

 
h1. Suggestion
 # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
 # The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
num.network.threads is used as the default value
*listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.


> Broker parameter optimization: security.inter.broker.protocol and num.network.threads
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13437
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: RivenSun
>            Priority: Major
>
> h1. 1. security.inter.broker.protocol
> Firstly see this parameter comment
> {code:java}
> security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an error to set this and inter.broker.listener.name properties at the same time. {code}
> We will not know from the comments, after using this configuration, the final value of InterBrokerListenerName is the same as the value of security.inter.broker.protocol. I originally thought it would find a suitable listenerName from the listener.security.protocol.map configuration.
> The result is: broker startup failed
>  
> {code:java}
> [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
>         at scala.Predef$.require(Predef.scala:337)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
>         at kafka.Kafka$.buildServer(Kafka.scala:67)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
>  {code}
>  
>  
> h1. 2. num.network.threads
> The networkThreads corresponding to this parameter are not shared by all listeners, but each listener will create the same number of networkProcessors, which causes the Kafka process to open too many unnecessary threads, which leads to a waste of resources.
> for example:
> listenerNameA: used for communication between brokers
> listenerNameB: used to connect production messages and fetch messages on the client side
> listenerNameC: Used by Kafka operation and maintenance personnel to manage the cluster and send control type requests, such as deleting topics or adding partitions, etc.
> So as expected, the num.network.threads of listenerNameB should be increased, and the networkThreads of the other two listeners can be appropriately reduced
>  
> h1. Rootcause:
> 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 
> KafkaConfig.scala
> {code:java}
> private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
>   Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
>     case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
>       throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " +
>         s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
>     case Some(name) =>
>       val listenerName = ListenerName.normalised(name)
>       val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
>         throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
>           s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
>       (listenerName, securityProtocol)
>     case None =>
>       val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
>         KafkaConfig.InterBrokerSecurityProtocolProp)
>       (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
>   }
> } {code}
> ListenerName.forSecurityProtocol(securityProtocol) limits the value of InterBrokerListenerName to the value of securityProtocol.name
> 2. See "addDataPlaneProcessors" method in SocketServer.scala
> In this method, processors of the size of newProcessorsPerListener are created for each EndPoint, the value of newProcessorsPerListener is config.numNetworkThreads
>  
> h1. Suggestion
>  # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
> Use listenerSecurityProtocolMap to find a suitable listenerName for security.inter.broker.protocol.
> If there are multiple keys in the listenerSecurityProtocolMap with "mapKey.value='value of security.inter.broker.protocol'", the listenerName corresponding to the first key is returned.
>  # The number of network threads can be configured separately for each listenerName, refer to the definition of the parameter sasl.server.callback.handler.class,
> num.network.threads is used as the default value
> *listener.name.\{listenerName}.num.network.threads* is used as the private configuration of each listener.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)