You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/08 12:11:26 UTC
kafka git commit: KAFKA-4260;
Disallow non-routable address in advertised.listeners
Repository: kafka
Updated Branches:
refs/heads/trunk 6546f9af6 -> ef17d5b89
KAFKA-4260; Disallow non-routable address in advertised.listeners
As described in the JIRA ticket, when `listeners=PLAINTEXT://0.0.0.0:9092`
(note the 0.0.0.0 "bind all interfaces" IP address) and
`advertised.listeners` is not specified it defaults to `listeners`,
but it makes no sense to advertise 0.0.0.0 as it's not a routable IP
address.
This patch checks for a 0.0.0.0 host in `advertised.listeners`
(whether via default or not) and fails with a meaningful error if it's
found.
This contribution is my original work and I license the work to the
project under the project's open source license.
Author: Tom Bentley <tb...@redhat.com>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3382 from tombentley/advertised.listeners
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef17d5b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef17d5b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef17d5b8
Branch: refs/heads/trunk
Commit: ef17d5b892fa34811c805cac366744040e6d6efb
Parents: 6546f9a
Author: Tom Bentley <tb...@redhat.com>
Authored: Fri Sep 8 13:10:06 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 8 13:10:13 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++++++--
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 8 ++++++++
2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef17d5b8/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 89ba641..de6559c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -452,9 +452,10 @@ object KafkaConfig {
"The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
"need to be different from the port to which the broker binds. If this is not set, " +
"it will publish the same port that the broker binds to."
- val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
+ val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
- " If this is not set, the value for `listeners` will be used."
+ " If this is not set, the value for `listeners` will be used." +
+ " Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address."
val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " +
"the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " +
"external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " +
@@ -1190,6 +1191,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
+ require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
+ s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
+ s"Use a routable IP address.")
require(interBrokerProtocolVersion >= logMessageFormatVersion,
s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
http://git-wip-us.apache.org/repos/asf/kafka/blob/ef17d5b8/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index dee6e87..08a45b3 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -718,6 +718,14 @@ class KafkaConfigTest {
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
}
+ @Test
+ def testNonroutableAdvertisedListeners() {
+ val props = new Properties()
+ props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
+ assertFalse(isValidKafkaConfig(props))
+ }
+
private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
values.foreach((value) => {
val props = validRequiredProps