You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/03 20:09:00 UTC
kafka git commit: KAFKA-4703: Test with two SASL_SSL listeners with
different JAAS contexts
Repository: kafka
Updated Branches:
refs/heads/trunk 8d7492016 -> a7671c7f3
KAFKA-4703: Test with two SASL_SSL listeners with different JAAS contexts
Tests broker with multiple SASL mechanisms with different endpoints for different mechanisms. Each endpoint uses its own JAAS context.
Author: Balint Molnar <ba...@gmail.com>
Reviewers: Rajini Sivaram, Ismael Juma
Closes #2506 from baluchicken/KAFKA-4703
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7671c7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7671c7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7671c7f
Branch: refs/heads/trunk
Commit: a7671c7f3723113e716b99471e7be3499fde1b15
Parents: 8d74920
Author: Balint Molnar <ba...@gmail.com>
Authored: Wed May 3 21:08:30 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed May 3 21:08:30 2017 +0100
----------------------------------------------------------------------
.../api/SaslEndToEndAuthorizationTest.scala | 2 +-
.../scala/integration/kafka/api/SaslSetup.scala | 47 ++++--
...ListenersWithAdditionalJaasContextTest.scala | 47 ++++++
...pleListenersWithDefaultJaasContextTest.scala | 37 +++++
...tenersWithSameSecurityProtocolBaseTest.scala | 158 +++++++++++++++++++
...eListenersWithSameSecurityProtocolTest.scala | 132 ----------------
.../security/auth/ZkAuthorizationTest.scala | 2 +-
.../scala/unit/kafka/utils/JaasTestUtils.scala | 32 +---
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 2 +-
9 files changed, 283 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index dd91627..d4c417c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.GroupAuthorizationException
-import org.junit.{Before,Test}
+import org.junit.{Before, Test}
import scala.collection.immutable.List
import scala.collection.JavaConverters._
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 29aea61..13ed2e2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -20,8 +20,10 @@ package kafka.api
import java.io.File
import java.util.Properties
import javax.security.auth.login.Configuration
+
import kafka.security.minikdc.MiniKdc
import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.authenticator.LoginManager
@@ -29,12 +31,13 @@ import org.apache.kafka.common.config.SaslConfigs
/*
* Implements an enumeration for the modes enabled here:
- * zk only, kafka only, both.
+ * zk only, kafka only, both, custom KafkaServer.
*/
sealed trait SaslSetupMode
case object ZkSasl extends SaslSetupMode
case object KafkaSasl extends SaslSetupMode
case object Both extends SaslSetupMode
+case object CustomKafkaServerSasl extends SaslSetupMode
/*
* Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
@@ -43,11 +46,13 @@ trait SaslSetup {
private val workDir = TestUtils.tempDir()
private val kdcConf = MiniKdc.createConfig
private var kdc: MiniKdc = null
- private var serverKeytabFile: Option[File] = null
- private var clientKeytabFile: Option[File] = null
+ private var serverKeytabFile: Option[File] = None
+ private var clientKeytabFile: Option[File] = None
+ private var jaasContext: Seq[JaasSection] = Seq()
def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String],
- mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName) {
+ mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName,
+ withDefaultJaasContext: Boolean = true) {
// Important if tests leak consumers, producers or brokers
LoginManager.closeAll()
val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
@@ -60,27 +65,39 @@ trait SaslSetup {
kdc.start()
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
- } else {
- this.clientKeytabFile = None
- this.serverKeytabFile = None
}
- setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
+ if (withDefaultJaasContext) {
+ setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
+ writeJaasConfigurationToFile()
+ } else
+ setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
if (mode == Both || mode == ZkSasl)
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}
protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String,
kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
- val jaasFile = mode match {
- case ZkSasl => JaasTestUtils.writeZkFile()
- case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, kafkaServerSaslMechanisms,
- kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
- case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, kafkaServerSaslMechanisms,
- kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
+ val jaasSection = mode match {
+ case ZkSasl => JaasTestUtils.zkSections
+ case KafkaSasl =>
+ Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
+ JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile))
+ case CustomKafkaServerSasl => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName,
+ kafkaServerSaslMechanisms, serverKeytabFile))
+ case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, kafkaServerSaslMechanisms, serverKeytabFile),
+ JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, clientKeytabFile)) ++ JaasTestUtils.zkSections
}
+ jaasContext = jaasContext ++ jaasSection
+ }
+
+ protected def writeJaasConfigurationToFile() {
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null)
- System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, JaasTestUtils.writeJaasContextsToFile(jaasContext))
+ }
+
+ protected def removeJaasSection(context: String) {
+ jaasContext = jaasContext.filter(_.contextName != context)
}
def closeSasl() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
new file mode 100644
index 0000000..3251be0
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.Properties
+
+import kafka.api.CustomKafkaServerSasl
+import org.apache.kafka.common.network.ListenerName
+
+
+class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest{
+
+ import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+ override def setSaslProperties(listenerName: ListenerName): Option[Properties] = {
+
+ val gssapiSaslProperties = kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
+ val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
+
+ listenerName.value match {
+ case SecureInternal => Some(plainSaslProperties)
+ case SecureExternal => Some(gssapiSaslProperties)
+ case _ => None
+ }
+ }
+
+ override def addJaasSection(): Unit = {
+ setJaasConfiguration(CustomKafkaServerSasl, "secure_external.KafkaServer", List(GssApi), None)
+ setJaasConfiguration(CustomKafkaServerSasl, "secure_internal.KafkaServer", List(Plain), None)
+ removeJaasSection("KafkaServer")
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
new file mode 100644
index 0000000..8291d82
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util.Properties
+
+import org.apache.kafka.common.network.ListenerName
+
+
+class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
+
+ import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+ override def setSaslProperties(listenerName: ListenerName): Option[Properties] = {
+ val plainSaslProperties = kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
+
+ listenerName.value match {
+ case SecureExternal | SecureInternal => Some(plainSaslProperties)
+ case _ => None
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
new file mode 100644
index 0000000..9765279
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import java.util.{Collections, Properties}
+import java.util.concurrent.TimeUnit
+
+import kafka.api.{Both, SaslSetup}
+import kafka.common.Topic
+import kafka.coordinator.group.OffsetConfig
+import kafka.utils.{CoreUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+object MultipleListenersWithSameSecurityProtocolBaseTest {
+ val SecureInternal = "SECURE_INTERNAL"
+ val SecureExternal = "SECURE_EXTERNAL"
+ val Internal = "INTERNAL"
+ val External = "EXTERNAL"
+ val GssApi = "GSSAPI"
+ val Plain = "PLAIN"
+}
+
+abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeeperTestHarness with SaslSetup{
+
+ import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+ private val trustStoreFile = File.createTempFile("truststore", ".jks")
+ private val servers = new ArrayBuffer[KafkaServer]
+ private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
+ private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
+ private val kafkaClientSaslMechanism = Plain
+ private val kafkaServerSaslMechanisms = List(GssApi, Plain)
+
+ protected def setSaslProperties(listenerName: ListenerName): Option[Properties]
+ protected def addJaasSection(): Unit = {}
+
+ @Before
+ override def setUp(): Unit = {
+ startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, withDefaultJaasContext = false)
+ addJaasSection()
+ writeJaasConfigurationToFile()
+ super.setUp()
+ // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
+ val numServers = 2
+
+ (0 until numServers).foreach { brokerId =>
+
+ val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile))
+ // Ensure that we can support multiple listeners per security protocol and multiple security protocols
+ props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $Internal://localhost:0, " +
+ s"$SecureExternal://localhost:0, $External://localhost:0")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
+ s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
+ props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
+ props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
+ props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
+ props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+
+ props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
+
+ // set listener-specific configs and set an invalid path for the global config to verify that the overrides work
+ Seq(SecureInternal, SecureExternal).foreach { listenerName =>
+ props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+ props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+ }
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
+
+ servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+ }
+
+ val serverConfig = servers.head.config
+ assertEquals(4, serverConfig.listeners.size)
+
+ TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
+
+ serverConfig.listeners.foreach { endPoint =>
+ val listenerName = endPoint.listenerName
+
+ TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
+
+ val trustStoreFile =
+ if (endPoint.securityProtocol == SecurityProtocol.SASL_SSL) Some(this.trustStoreFile)
+ else None
+
+ val saslProperties = setSaslProperties(listenerName)
+
+ val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
+
+ producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
+ securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties)
+
+ consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
+ securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProperties)
+ }
+ }
+
+ @After
+ override def tearDown() {
+ producers.values.foreach(_.close())
+ consumers.values.foreach(_.close())
+ servers.foreach { s =>
+ s.shutdown()
+ CoreUtils.delete(s.config.logDirs)
+ }
+ super.tearDown()
+ }
+
+ /**
+ * Tests that we can produce and consume to/from all broker-defined listeners and security protocols. We produce
+ * with acks=-1 to ensure that replication is also working.
+ */
+ @Test
+ def testProduceConsume(): Unit = {
+ producers.foreach { case (listenerName, producer) =>
+ val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
+ s"value$i".getBytes))
+ producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+
+ val consumer = consumers(listenerName)
+ consumer.subscribe(Collections.singleton(listenerName.value))
+ val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+ TestUtils.waitUntilTrue(() => {
+ records ++= consumer.poll(50).asScala
+ records.size == producerRecords.size
+ }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
deleted file mode 100644
index ccc118c..0000000
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.io.File
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-
-import kafka.common.Topic
-import kafka.coordinator.group.OffsetConfig
-import kafka.utils.{CoreUtils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.config.SslConfigs
-import org.apache.kafka.common.network.{ListenerName, Mode}
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.Assert.assertEquals
-import org.junit.{After, Before, Test}
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConverters._
-
-class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness {
-
- private val trustStoreFile = File.createTempFile("truststore", ".jks")
- private val servers = new ArrayBuffer[KafkaServer]
- private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
- private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
-
- @Before
- override def setUp(): Unit = {
- super.setUp()
- // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
- val numServers = 2
-
- (0 until numServers).foreach { brokerId =>
-
- val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile))
- // Ensure that we can support multiple listeners per security protocol and multiple security protocols
- props.put(KafkaConfig.ListenersProp, "SECURE_INTERNAL://localhost:0, INTERNAL://localhost:0, " +
- "SECURE_EXTERNAL://localhost:0, EXTERNAL://localhost:0")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "INTERNAL:PLAINTEXT, SECURE_INTERNAL:SSL," +
- "EXTERNAL:PLAINTEXT, SECURE_EXTERNAL:SSL")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
- props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
-
- // set listener-specific configs and set an invalid path for the global config to verify that the overrides work
- Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName =>
- props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
- props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
- }
- props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
-
- servers += TestUtils.createServer(KafkaConfig.fromProps(props))
- }
-
- val serverConfig = servers.head.config
- assertEquals(4, serverConfig.listeners.size)
-
- TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions,
- replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
-
- serverConfig.listeners.foreach { endPoint =>
- val listenerName = endPoint.listenerName
-
- TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
-
- val trustStoreFile =
- if (endPoint.securityProtocol == SecurityProtocol.SSL) Some(this.trustStoreFile)
- else None
-
- val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
-
- producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
- securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile)
-
- consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
- securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile)
- }
- }
-
- @After
- override def tearDown() {
- producers.values.foreach(_.close())
- consumers.values.foreach(_.close())
- servers.foreach { s =>
- s.shutdown()
- CoreUtils.delete(s.config.logDirs)
- }
- super.tearDown()
- }
-
- /**
- * Tests that we can produce and consume to/from all broker-defined listeners and security protocols. We produce
- * with acks=-1 to ensure that replication is also working.
- */
- @Test
- def testProduceConsume(): Unit = {
- producers.foreach { case (listenerName, producer) =>
- val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
- s"value$i".getBytes))
- producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
-
- val consumer = consumers(listenerName)
- consumer.subscribe(Collections.singleton(listenerName.value))
- val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
- TestUtils.waitUntilTrue(() => {
- records ++= consumer.poll(50).asScala
- records.size == producerRecords.size
- }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records")
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 3e7fce4..5ea92aa 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -30,7 +30,7 @@ import scala.util.{Try, Success, Failure}
import javax.security.auth.login.Configuration
class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
- val jaasFile = kafka.utils.JaasTestUtils.writeZkFile
+ val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 7b90abf..3ae680c 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -82,7 +82,7 @@ object JaasTestUtils {
}
}
- class JaasSection(contextName: String,
+ case class JaasSection(contextName: String,
jaasModule: Seq[JaasModule]) {
override def toString: String = {
s"""|$contextName {
@@ -122,29 +122,9 @@ object JaasTestUtils {
val KafkaScramAdmin = "scram-admin"
val KafkaScramAdminPassword = "scram-admin-secret"
- def writeZkFile(): String = {
+ def writeJaasContextsToFile(jaasContexts: Seq[JaasSection]): String = {
val jaasFile = TestUtils.tempFile()
- writeToFile(jaasFile, zkSections)
- jaasFile.getCanonicalPath
- }
-
- def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: List[String],
- kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File],
- clientKeyTabLocation: Option[File]): String = {
- val jaasFile = TestUtils.tempFile()
- val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation),
- kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
- writeToFile(jaasFile, kafkaSections)
- jaasFile.getCanonicalPath
- }
-
- def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: List[String],
- kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File],
- clientKeyTabLocation: Option[File]): String = {
- val jaasFile = TestUtils.tempFile()
- val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation),
- kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
- writeToFile(jaasFile, kafkaSections ++ zkSections)
+ writeToFile(jaasFile,jaasContexts)
jaasFile.getCanonicalPath
}
@@ -152,12 +132,12 @@ object JaasTestUtils {
def clientLoginModule(mechanism: String, keytabLocation: Option[File]): String =
kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
- private def zkSections: Seq[JaasSection] = Seq(
+ def zkSections: Seq[JaasSection] = Seq(
new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))))
)
- private def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
+ def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
val modules = mechanisms.map {
case "GSSAPI" =>
Krb5LoginModule(
@@ -215,7 +195,7 @@ object JaasTestUtils {
/*
* Used for the static JAAS configuration and it uses the credentials for client#2
*/
- private def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = {
+ def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = {
new JaasSection(KafkaClientContextName, mechanism.map(m =>
kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)).toSeq)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 4d57ed9..c9076b5 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -47,7 +47,7 @@ object ZKEphemeralTest {
@RunWith(value = classOf[Parameterized])
class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
- val jaasFile = kafka.utils.JaasTestUtils.writeZkFile()
+ val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
var zkSessionTimeoutMs = 1000