You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/11 15:49:55 UTC

[kafka] branch trunk updated: KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec7ba32  KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897)
ec7ba32 is described below

commit ec7ba32af6542c6dbbf264a79804d25a98707971
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Fri May 11 21:19:49 2018 +0530

    KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897)
    
    Do not allow server startup if one of its configured advertised listeners has already been registered by another broker.
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  1 -
 core/src/main/scala/kafka/server/KafkaServer.scala |  8 +++-
 .../server/DynamicBrokerReconfigurationTest.scala  |  2 +-
 .../scala/unit/kafka/server/KafkaServerTest.scala  | 49 ++++++++++++++++++++++
 4 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e296e26..4069b8e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c729c8c..ebbc0b8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
@@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   private[server] def createBrokerInfo: BrokerInfo = {
+    val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+    zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
+      val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
+      require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
+        s" advertised listeners are already registered by broker ${broker.id}")
+    }
+
     val listeners = config.advertisedListeners.map { endpoint =>
       if (endpoint.port == 0)
         endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 8b70875..fb96f9d 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -701,7 +701,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     // Ensure connections are made to brokers before external listener is made inaccessible
     describeConfig(externalAdminClient)
 
-    // Update broker keystore for external listener to use invalid listener address
+    // Update broker external listener to use invalid listener address
     // any address other than localhost is sufficient to fail (either connection or host name verification failure)
     val invalidHost = "192.168.0.1"
     alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
new file mode 100755
index 0000000..d78821a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class KafkaServerTest extends ZooKeeperTestHarness {
+
+  @Test
+  def testAlreadyRegisteredAdvertisedListeners() {
+    //start a server with a advertised listener
+    val server1 = createServer(1, "myhost", TestUtils.RandomPort)
+
+    //start a server with same advertised listener
+    intercept[IllegalArgumentException] {
+      createServer(2, "myhost", TestUtils.boundPort(server1))
+    }
+
+    //start a server with same host but with different port
+    val server2 = createServer(2, "myhost", TestUtils.RandomPort)
+
+    TestUtils.shutdownServers(Seq(server1, server2))
+  }
+
+  def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
+    val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+    props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")
+    val kafkaConfig = KafkaConfig.fromProps(props)
+    TestUtils.createServer(kafkaConfig)
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.