You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/04 00:42:27 UTC

kafka git commit: KAFKA-3194: Validate security.inter.broker.protocol against the adver…

Repository: kafka
Updated Branches:
  refs/heads/trunk 79eacf6c9 -> 5b5869383


KAFKA-3194: Validate security.inter.broker.protocol against the adver…

…tised.listeners protocols

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma

Closes #851 from granthenke/verify-protocol


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b586938
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b586938
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b586938

Branch: refs/heads/trunk
Commit: 5b5869383832512d78e20183c855f83d30a5ab37
Parents: 79eacf6
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Feb 3 15:42:22 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Feb 3 15:42:22 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  7 +++++
 .../unit/kafka/network/SocketServerTest.scala   |  4 +--
 .../unit/kafka/server/KafkaConfigTest.scala     | 28 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 00bf0cb..2c6311c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -951,6 +951,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+    require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol),
+      s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured set of ${KafkaConfig.AdvertisedListenersProp}. " +
+      s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}")
+    require(advertisedListeners.keySet.subsetOf(listeners.keySet),
+      s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " +
+      s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}"
+    )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b4ba027..d94c314 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -233,9 +233,9 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSslSocketServer(): Unit = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
-    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true,
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
       trustStoreFile = Some(trustStoreFile))
-    overrideProps.put("listeners", "SSL://localhost:0")
+    overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
 
     val serverMetrics = new Metrics
     val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e8ffb5b..2479b37 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -393,6 +393,34 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testInvalidInterBrokerSecurityProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
+    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+    intercept[IllegalArgumentException] {
+      KafkaConfig.fromProps(props)
+    }
+  }
+
+  @Test
+  def testEqualAdvertisedListenersProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testInvalidAdvertisedListenersProtocol() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+    intercept[IllegalArgumentException] {
+      KafkaConfig.fromProps(props)
+    }
+  }
+
+  @Test
   def testFromPropsInvalid() {
     def getBaseProperties(): Properties = {
       val validRequiredProperties = new Properties()