You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/22 08:02:50 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6

showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r927372305


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{

Review Comment:
   nit: space after `foreach`



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{

Review Comment:
   nit: need a space after `map`



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")

Review Comment:
   Before this PR, we'll validate if **all endpoint listenerName** are distinct. But after this PR, we only validate **partial endpoint listenerName** are distinct. It will cause some error cannot be caught. Ex:
   ```
   props.put(KafkaConfig.ListenersProp, "SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SSL://127.0.0.1:9097")
   ```
   This props will fail before this PR, because there are 2 `SSL` listener names. But in this PR, we'll group the first two endpoints first and validate them. Later, validate the last one. And it'll pass validation, which is unexpected. Please fix it and add a test for it. 



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)

Review Comment:
   We validate `duplicatesWithIpHosts` and `duplicatesWithoutIpHosts` here, so it will allow props like this:
   ```
   props.put(KafkaConfig.ListenersProp, "SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,CONTROLLER://:9096")
   ```
   It that expected?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+          duplicatesWithIpHosts match {
+            case s if s.isEmpty =>
+            case Seq(one, two) =>
+              if (requireDistinctPorts)
+                require(validateOneIsIpv4AndOtherIpv6(one.host, two.host), "If you have two listeners on " +
+                  s"the same port then one needs to be IPv4 and the other IPv6, listeners: $listeners, port: $port")
+            case other =>
+              checkDuplicateListenerNames(other, listeners)

Review Comment:
   We can move this listenerNames check below `if (requireDistinctPorts)` condition, so that we can throw exception directly if `requireDistinctPorts` true, right?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -741,12 +741,15 @@ object KafkaConfig {
   /** ********* Socket Server Configuration ***********/
   val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." +
     s" If the listener name is not a security protocol, <code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
-    " Listener names and port numbers must be unique.\n" +
+    " Listener names and port numbers must be unique unless \n" +
+    " one listener is an IPv4 address and the other listener is \n" +
+    " an IPv6 address (for the same port).\n" +

Review Comment:
   nit: I think "for the same port" can be removed since that is already mentioned in the beginning. WDYT?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+          duplicatesWithIpHosts match {
+            case s if s.isEmpty =>

Review Comment:
   nit: What does `s` mean here? 



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   Should we throw exception when the `host` is invalid? Currently, we can allow invalid host pass `validate` method, since we only check duplicate listenerNames and ports below. Is that correct?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+          duplicatesWithIpHosts match {
+            case s if s.isEmpty =>
+            case Seq(one, two) =>

Review Comment:
   nit: rename to `Seq(ep1, ep2)` ?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they are valid
+      duplicatePortsPartitionedByValidIps.foreach{
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+          duplicatesWithIpHosts match {
+            case s if s.isEmpty =>
+            case Seq(one, two) =>
+              if (requireDistinctPorts)
+                require(validateOneIsIpv4AndOtherIpv6(one.host, two.host), "If you have two listeners on " +
+                  s"the same port then one needs to be IPv4 and the other IPv6, listeners: $listeners, port: $port")
+            case other =>

Review Comment:
   Could we rename this variable? ex: `allEps`?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: $listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case (_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and the other is on IPv6

Review Comment:
   nit: // allow duplicate ports if one host is on IPv4 and the other one is on IPv6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org