You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/11 15:49:55 UTC
[kafka] branch trunk updated: KAFKA-6394;
Add a check to prevent misconfiguration of advertised listeners
(#4897)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ec7ba32 KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897)
ec7ba32 is described below
commit ec7ba32af6542c6dbbf264a79804d25a98707971
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Fri May 11 21:19:49 2018 +0530
KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897)
Do not allow server startup if one of its configured advertised listeners has already been registered by another broker.
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 1 -
core/src/main/scala/kafka/server/KafkaServer.scala | 8 +++-
.../server/DynamicBrokerReconfigurationTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaServerTest.scala | 49 ++++++++++++++++++++++
4 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e296e26..4069b8e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.quota.ClientQuotaCallback
import scala.collection.JavaConverters._
import scala.collection.Map
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c729c8c..ebbc0b8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
@@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
private[server] def createBrokerInfo: BrokerInfo = {
+ val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+ zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
+ val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
+ require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
+ s" advertised listeners are already registered by broker ${broker.id}")
+ }
+
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 8b70875..fb96f9d 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -701,7 +701,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
// Ensure connections are made to brokers before external listener is made inaccessible
describeConfig(externalAdminClient)
- // Update broker keystore for external listener to use invalid listener address
+ // Update broker external listener to use invalid listener address
// any address other than localhost is sufficient to fail (either connection or host name verification failure)
val invalidHost = "192.168.0.1"
alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
new file mode 100755
index 0000000..d78821a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class KafkaServerTest extends ZooKeeperTestHarness {
+
+ @Test
+ def testAlreadyRegisteredAdvertisedListeners() {
+ //start a server with a advertised listener
+ val server1 = createServer(1, "myhost", TestUtils.RandomPort)
+
+ //start a server with same advertised listener
+ intercept[IllegalArgumentException] {
+ createServer(2, "myhost", TestUtils.boundPort(server1))
+ }
+
+ //start a server with same host but with different port
+ val server2 = createServer(2, "myhost", TestUtils.RandomPort)
+
+ TestUtils.shutdownServers(Seq(server1, server2))
+ }
+
+ def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
+ val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+ props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")
+ val kafkaConfig = KafkaConfig.fromProps(props)
+ TestUtils.createServer(kafkaConfig)
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.