You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/04 00:42:27 UTC
kafka git commit: KAFKA-3194: Validate security.inter.broker.protocol against the adver…
Repository: kafka
Updated Branches:
refs/heads/trunk 79eacf6c9 -> 5b5869383
KAFKA-3194: Validate security.inter.broker.protocol against the adver…
…tised.listeners protocols
Author: Grant Henke <gr...@gmail.com>
Reviewers: Ismael Juma
Closes #851 from granthenke/verify-protocol
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b586938
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b586938
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b586938
Branch: refs/heads/trunk
Commit: 5b5869383832512d78e20183c855f83d30a5ab37
Parents: 79eacf6
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Feb 3 15:42:22 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Feb 3 15:42:22 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/server/KafkaConfig.scala | 7 +++++
.../unit/kafka/network/SocketServerTest.scala | 4 +--
.../unit/kafka/server/KafkaConfigTest.scala | 28 ++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 00bf0cb..2c6311c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -951,6 +951,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
" Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
+ require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol),
+ s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured set of ${KafkaConfig.AdvertisedListenersProp}. " +
+ s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}")
+ require(advertisedListeners.keySet.subsetOf(listeners.keySet),
+ s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " +
+ s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}"
+ )
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b4ba027..d94c314 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -233,9 +233,9 @@ class SocketServerTest extends JUnitSuite {
@Test
def testSslSocketServer(): Unit = {
val trustStoreFile = File.createTempFile("truststore", ".jks")
- val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true,
+ val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
trustStoreFile = Some(trustStoreFile))
- overrideProps.put("listeners", "SSL://localhost:0")
+ overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
val serverMetrics = new Metrics
val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b586938/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e8ffb5b..2479b37 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -393,6 +393,34 @@ class KafkaConfigTest {
}
@Test
+ def testInvalidInterBrokerSecurityProtocol() {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
+ props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+ intercept[IllegalArgumentException] {
+ KafkaConfig.fromProps(props)
+ }
+ }
+
+ @Test
+ def testEqualAdvertisedListenersProtocol() {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testInvalidAdvertisedListenersProtocol() {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+ props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ intercept[IllegalArgumentException] {
+ KafkaConfig.fromProps(props)
+ }
+ }
+
+ @Test
def testFromPropsInvalid() {
def getBaseProperties(): Properties = {
val validRequiredProperties = new Properties()