You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/08 12:11:26 UTC

kafka git commit: KAFKA-4260; Disallow non-routable address in advertised.listeners

Repository: kafka
Updated Branches:
  refs/heads/trunk 6546f9af6 -> ef17d5b89


KAFKA-4260; Disallow non-routable address in advertised.listeners

As described in the JIRA ticket, when `listeners=PLAINTEXT://0.0.0.0:9092`
(note the 0.0.0.0 "bind all interfaces" IP address) and
`advertised.listeners` is not specified it defaults to `listeners`,
but it makes no sense to advertise 0.0.0.0 as it's not a routable IP
address.

This patch checks for a 0.0.0.0 host in `advertised.listeners`
(whether via default or not) and fails with a meaningful error if it's
found.

This contribution is my original work and I license the work to the
project under the project's open source license.

Author: Tom Bentley <tb...@redhat.com>

Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3382 from tombentley/advertised.listeners


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef17d5b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef17d5b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef17d5b8

Branch: refs/heads/trunk
Commit: ef17d5b892fa34811c805cac366744040e6d6efb
Parents: 6546f9a
Author: Tom Bentley <tb...@redhat.com>
Authored: Fri Sep 8 13:10:06 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 8 13:10:13 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 8 ++++++--
 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 8 ++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef17d5b8/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 89ba641..de6559c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -452,9 +452,10 @@ object KafkaConfig {
   "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
   "need to be different from the port to which the broker binds. If this is not set, " +
   "it will publish the same port that the broker binds to."
-  val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
+  val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property." +
   " In IaaS environments, this may need to be different from the interface to which the broker binds." +
-  " If this is not set, the value for `listeners` will be used."
+  " If this is not set, the value for `listeners` will be used." +
+  " Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address."
   val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " +
     "the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " +
     "external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " +
@@ -1190,6 +1191,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
       s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
       s"are ${listenerNames.map(_.value).mkString(",")}"
     )
+    require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
+      s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
+      s"Use a routable IP address.")
     require(interBrokerProtocolVersion >= logMessageFormatVersion,
       s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
     val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef17d5b8/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index dee6e87..08a45b3 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -718,6 +718,14 @@ class KafkaConfigTest {
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
   }
 
+  @Test
+  def testNonroutableAdvertisedListeners() {
+    val props = new Properties()
+    props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
   private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
     values.foreach((value) => {
       val props = validRequiredProps