You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:24 UTC
[08/50] [abbrv] kafka git commit: KAFKA-3475;
Introduce our own `MiniKdc`
KAFKA-3475; Introduce our own `MiniKdc`
This also fixes KAFKA-3453 and KAFKA-2866.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Gwen Shapira
Closes #1155 from ijuma/kafka-3475-introduce-our-minikdc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78d91dcd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78d91dcd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78d91dcd
Branch: refs/heads/0.10.0
Commit: 78d91dcd8805d850038df52718380a6f956abad7
Parents: 2788f2d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Mar 30 19:30:34 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Mar 30 19:30:34 2016 -0700
----------------------------------------------------------------------
build.gradle | 15 +-
.../org/apache/kafka/common/utils/Utils.java | 2 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 15 +-
core/src/test/resources/minikdc-krb5.conf | 25 ++
core/src/test/resources/minikdc.ldiff | 47 ++
.../scala/integration/kafka/api/SaslSetup.scala | 10 +-
.../scala/kafka/security/minikdc/MiniKdc.scala | 433 +++++++++++++++++++
.../integration/KafkaServerTestHarness.scala | 12 +-
gradle/dependencies.gradle | 12 +-
tests/kafkatest/services/security/minikdc.py | 12 +-
10 files changed, 557 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 13a8b4e..d6f82a4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -338,7 +338,17 @@ project(':core') {
testCompile project(':clients').sourceSets.test.output
testCompile libs.bcpkix
testCompile libs.easymock
- testCompile libs.hadoopMiniKdc
+ testCompile(libs.apacheda) {
+ exclude group: 'xml-apis', module: 'xml-apis'
+ }
+ testCompile libs.apachedsCoreApi
+ testCompile libs.apachedsInterceptorKerberos
+ testCompile libs.apachedsProtocolShared
+ testCompile libs.apachedsProtocolKerberos
+ testCompile libs.apachedsProtocolLdap
+ testCompile libs.apachedsLdifPartition
+ testCompile libs.apachedsMavibotPartition
+ testCompile libs.apachedsJdbmPartition
testCompile libs.junit
testCompile libs.scalaTest
}
@@ -368,6 +378,9 @@ project(':core') {
duplicatesStrategy 'exclude'
}
+ systemTestLibs {
+ dependsOn testJar
+ }
task genProtocolErrorDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0167548..2a98822 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -476,7 +476,7 @@ public class Utils {
* @param daemon Should the thread block JVM shutdown?
* @return The unstarted thread
*/
- public static Thread newThread(String name, Runnable runnable, Boolean daemon) {
+ public static Thread newThread(String name, Runnable runnable, boolean daemon) {
Thread thread = new Thread(runnable, name);
thread.setDaemon(daemon);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index fe2bebf..5b6c59f 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -57,13 +57,14 @@ object CoreUtils extends Logging {
}
/**
- * Create a daemon thread
- * @param name The name of the thread
- * @param fun The function to execute in the thread
- * @return The unstarted thread
- */
- def daemonThread(name: String, fun: => Unit): Thread =
- Utils.daemonThread(name, runnable(fun))
+ * Create a thread
+ * @param name The name of the thread
+ * @param daemon Whether the thread should block JVM shutdown
+ * @param fun The function to execute in the thread
+ * @return The unstarted thread
+ */
+ def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread =
+ Utils.newThread(name, runnable(fun), daemon)
/**
* Do the given action and log any exceptions thrown without rethrowing them
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/resources/minikdc-krb5.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/minikdc-krb5.conf b/core/src/test/resources/minikdc-krb5.conf
new file mode 100644
index 0000000..0603404
--- /dev/null
+++ b/core/src/test/resources/minikdc-krb5.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[libdefaults]
+default_realm = {0}
+udp_preference_limit = 1
+
+[realms]
+{0} = '{'
+ kdc = {1}:{2}
+'}'
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/resources/minikdc.ldiff
----------------------------------------------------------------------
diff --git a/core/src/test/resources/minikdc.ldiff b/core/src/test/resources/minikdc.ldiff
new file mode 100644
index 0000000..75e4dfd
--- /dev/null
+++ b/core/src/test/resources/minikdc.ldiff
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+dn: ou=users,dc=${0},dc=${1}
+objectClass: organizationalUnit
+objectClass: top
+ou: users
+
+dn: uid=krbtgt,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: KDC Service
+sn: Service
+uid: krbtgt
+userPassword: secret
+krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3}
+krb5KeyVersionNumber: 0
+
+dn: uid=ldap,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: LDAP
+sn: Service
+uid: ldap
+userPassword: secret
+krb5PrincipalName: ldap/${4}@${2}.${3}
+krb5KeyVersionNumber: 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index c36b288..8255e6a 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -17,11 +17,11 @@
package kafka.api
-import java.io.{File}
+import java.io.File
import javax.security.auth.login.Configuration
-import kafka.utils.{JaasTestUtils,TestUtils}
-import org.apache.hadoop.minikdc.MiniKdc
+import kafka.security.minikdc.MiniKdc
+import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.kerberos.LoginManager
@@ -39,8 +39,8 @@ case object Both extends SaslSetupMode
* currently to setup a keytab and jaas files.
*/
trait SaslSetup {
- private val workDir = new File(System.getProperty("test.dir", "build/tmp/test-workDir"))
- private val kdcConf = MiniKdc.createConf()
+ private val workDir = TestUtils.tempDir()
+ private val kdcConf = MiniKdc.createConfig
private val kdc = new MiniKdc(kdcConf, workDir)
def startSasl(mode: SaslSetupMode = Both) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
new file mode 100644
index 0000000..14807bc
--- /dev/null
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.minikdc
+
+import java.io._
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.text.MessageFormat
+import java.util.{Locale, Properties, UUID}
+
+import kafka.utils.{CoreUtils, Logging}
+
+import scala.collection.JavaConverters._
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.text.StrSubstitutor
+import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry}
+import org.apache.directory.api.ldap.model.ldif.LdifReader
+import org.apache.directory.api.ldap.model.name.Dn
+import org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor
+import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader
+import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager
+import org.apache.directory.server.constants.ServerDNConstants
+import org.apache.directory.server.core.DefaultDirectoryService
+import org.apache.directory.server.core.api.{CacheService, DirectoryService, InstanceLayout}
+import org.apache.directory.server.core.api.schema.SchemaPartition
+import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor
+import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, JdbmPartition}
+import org.apache.directory.server.core.partition.ldif.LdifPartition
+import org.apache.directory.server.kerberos.KerberosConfig
+import org.apache.directory.server.kerberos.kdc.KdcServer
+import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
+import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
+import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport}
+import org.apache.directory.server.xdbm.Index
+import org.apache.directory.shared.kerberos.KerberosTime
+import org.apache.kafka.common.utils.Utils
+
+/**
+ * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone
+ * KDC.
+ *
+ * MiniKdc sets 2 System properties when started and unsets them when stopped:
+ *
+ * - java.security.krb5.conf: set to the MiniKDC real/host/port
+ * - sun.security.krb5.debug: set to the debug value provided in the configuration
+ *
+ * As a result of this, multiple MiniKdc instances should not be started concurrently in the same JVM.
+ *
+ * MiniKdc default configuration values are:
+ *
+ * - org.name=EXAMPLE (used to create the REALM)
+ * - org.domain=COM (used to create the REALM)
+ * - kdc.bind.address=localhost
+ * - kdc.port=0 (ephemeral port)
+ * - instance=DefaultKrbServer
+ * - max.ticket.lifetime=86400000 (1 day)
+ * - max.renewable.lifetime604800000 (7 days)
+ * - transport=TCP
+ * - debug=false
+ *
+ * The generated krb5.conf forces TCP connections.
+ *
+ * Acknowledgements: this class is derived from the MiniKdc class in the hadoop-minikdc project (git commit
+ * d8d8ed35f00b15ee0f2f8aaf3fe7f7b42141286b).
+ *
+ * @constructor creates a new MiniKdc instance.
+ * @param config the MiniKdc configuration
+ * @param workDir the working directory which will contain krb5.conf, Apache DS files and any other files needed by
+ * MiniKdc.
+ * @throws Exception thrown if the MiniKdc could not be created.
+ */
+class MiniKdc(config: Properties, workDir: File) extends Logging {
+
+ if (!config.keySet.containsAll(MiniKdc.RequiredProperties.asJava)) {
+ val missingProperties = MiniKdc.RequiredProperties.filterNot(config.keySet.asScala)
+ throw new IllegalArgumentException(s"Missing configuration properties: $missingProperties")
+ }
+
+ info("Configuration:")
+ info("---------------------------------------------------------------")
+ config.asScala.foreach { case (key, value) =>
+ info(s"\t$key: $value")
+ }
+ info("---------------------------------------------------------------")
+
+ private val orgName = config.getProperty(MiniKdc.OrgName)
+ private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
+ private val dnString = s"dc=$orgName,dc=$orgDomain"
+ private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
+ private val krb5conf = new File(workDir, "krb5.conf")
+
+ private var _port = config.getProperty(MiniKdc.KdcPort).toInt
+ private var ds: DirectoryService = null
+ private var kdc: KdcServer = null
+
+ def port: Int = _port
+
+ def host: String = config.getProperty(MiniKdc.KdcBindAddress)
+
+ def start() {
+ if (kdc != null)
+ throw new RuntimeException("KDC already started")
+ initDirectoryService()
+ initKdcServer()
+ initJvmKerberosConfig()
+ }
+
+ private def initDirectoryService() {
+ ds = new DefaultDirectoryService
+ ds.setInstanceLayout(new InstanceLayout(workDir))
+ ds.setCacheService(new CacheService)
+
+ // first load the schema
+ val instanceLayout = ds.getInstanceLayout
+ val schemaPartitionDirectory = new File(instanceLayout.getPartitionsDirectory, "schema")
+ val extractor = new DefaultSchemaLdifExtractor(instanceLayout.getPartitionsDirectory)
+ extractor.extractOrCopy
+
+ val loader = new LdifSchemaLoader(schemaPartitionDirectory)
+ val schemaManager = new DefaultSchemaManager(loader)
+ schemaManager.loadAllEnabled()
+ ds.setSchemaManager(schemaManager)
+ // Init the LdifPartition with schema
+ val schemaLdifPartition = new LdifPartition(schemaManager, ds.getDnFactory)
+ schemaLdifPartition.setPartitionPath(schemaPartitionDirectory.toURI)
+
+ // The schema partition
+ val schemaPartition = new SchemaPartition(schemaManager)
+ schemaPartition.setWrappedPartition(schemaLdifPartition)
+ ds.setSchemaPartition(schemaPartition)
+
+ val systemPartition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
+ systemPartition.setId("system")
+ systemPartition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, systemPartition.getId).toURI)
+ systemPartition.setSuffixDn(new Dn(ServerDNConstants.SYSTEM_DN))
+ systemPartition.setSchemaManager(ds.getSchemaManager)
+ ds.setSystemPartition(systemPartition)
+
+ ds.getChangeLog.setEnabled(false)
+ ds.setDenormalizeOpAttrsEnabled(true)
+ ds.addLast(new KeyDerivationInterceptor)
+
+ // create one partition
+ val orgName = config.getProperty(MiniKdc.OrgName).toLowerCase(Locale.ENGLISH)
+ val orgDomain = config.getProperty(MiniKdc.OrgDomain).toLowerCase(Locale.ENGLISH)
+ val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
+ partition.setId(orgName)
+ partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
+ val dn = new Dn(dnString)
+ partition.setSuffixDn(dn)
+ ds.addPartition(partition)
+
+ // indexes
+ val indexedAttributes = Set[Index[_, String]](
+ new JdbmIndex[Entry]("objectClass", false),
+ new JdbmIndex[Entry]("dc", false),
+ new JdbmIndex[Entry]("ou", false)
+ ).asJava
+ partition.setIndexedAttributes(indexedAttributes)
+
+ // And start the ds
+ ds.setInstanceId(config.getProperty(MiniKdc.Instance))
+ ds.startup()
+
+ // context entry, after ds.startup()
+ val entry = ds.newEntry(dn)
+ entry.add("objectClass", "top", "domain")
+ entry.add("dc", orgName)
+ ds.getAdminSession.add(entry)
+ }
+
+ private def initKdcServer() {
+
+ def addInitialEntriesToDirectoryService(bindAddress: String) {
+ val map = Map (
+ "0" -> orgName.toLowerCase(Locale.ENGLISH),
+ "1" -> orgDomain.toLowerCase(Locale.ENGLISH),
+ "2" -> orgName.toUpperCase(Locale.ENGLISH),
+ "3" -> orgDomain.toUpperCase(Locale.ENGLISH),
+ "4" -> bindAddress
+ )
+ val inputStream = MiniKdc.getResourceAsStream("minikdc.ldiff")
+ try addEntriesToDirectoryService(StrSubstitutor.replace(IOUtils.toString(inputStream), map.asJava))
+ finally CoreUtils.swallow(inputStream.close())
+ }
+
+ val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
+ addInitialEntriesToDirectoryService(bindAddress)
+
+ val kerberosConfig = new KerberosConfig
+ kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
+ kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
+ kerberosConfig.setSearchBaseDn(dnString)
+ kerberosConfig.setPaEncTimestampRequired(false)
+ kdc = new KdcServer(kerberosConfig)
+ kdc.setDirectoryService(ds)
+
+ // transport
+ val transport = config.getProperty(MiniKdc.Transport)
+ val absTransport = transport.trim match {
+ case "TCP" => new TcpTransport(bindAddress, port, 3, 50)
+ case "UDP" => new UdpTransport(port)
+ case _ => throw new IllegalArgumentException(s"Invalid transport: $transport")
+ }
+ kdc.addTransports(absTransport)
+ kdc.setServiceName(config.getProperty(MiniKdc.Instance))
+ kdc.start()
+
+ // if using ephemeral port, update port number for binding
+ if (port == 0)
+ _port = absTransport.getAcceptor.getLocalAddress.asInstanceOf[InetSocketAddress].getPort
+
+ info(s"MiniKdc listening at port: $port")
+ }
+
+ private def initJvmKerberosConfig(): Unit = {
+ writeKrb5Conf()
+ System.setProperty(MiniKdc.JavaSecurityKrb5Conf, krb5conf.getAbsolutePath)
+ System.setProperty(MiniKdc.SunSecurityKrb5Debug, config.getProperty(MiniKdc.Debug, "false"))
+ info(s"MiniKdc setting JVM krb5.conf to: ${krb5conf.getAbsolutePath}")
+ refreshJvmKerberosConfig()
+ }
+
+ private def writeKrb5Conf() {
+ val stringBuilder = new StringBuilder
+ val reader = new BufferedReader(
+ new InputStreamReader(MiniKdc.getResourceAsStream("minikdc-krb5.conf"), StandardCharsets.UTF_8))
+ try {
+ var line: String = null
+ while ({line = reader.readLine(); line != null}) {
+ stringBuilder.append(line).append("{3}")
+ }
+ } finally CoreUtils.swallow(reader.close())
+ val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator())
+ Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8))
+ }
+
+ private def refreshJvmKerberosConfig(): Unit = {
+ val klass =
+ if (System.getProperty("java.vendor").contains("IBM"))
+ Class.forName("com.ibm.security.krb5.internal.Config")
+ else
+ Class.forName("sun.security.krb5.Config")
+ klass.getMethod("refresh").invoke(klass)
+ }
+
+ def stop() {
+ if (kdc != null) {
+ System.clearProperty(MiniKdc.JavaSecurityKrb5Conf)
+ System.clearProperty(MiniKdc.SunSecurityKrb5Debug)
+ kdc.stop()
+ try ds.shutdown()
+ catch {
+ case ex: Exception => error("Could not shutdown ApacheDS properly", ex)
+ }
+ }
+ }
+
+ /**
+ * Creates a principal in the KDC with the specified user and password.
+ *
+ * An exception will be thrown if the principal cannot be created.
+ *
+ * @param principal principal name, do not include the domain.
+ * @param password password.
+ */
+ private def createPrincipal(principal: String, password: String) {
+ val ldifContent = s"""
+ |dn: uid=$principal,ou=users,dc=${orgName.toLowerCase(Locale.ENGLISH)},dc=${orgDomain.toLowerCase(Locale.ENGLISH)}
+ |objectClass: top
+ |objectClass: person
+ |objectClass: inetOrgPerson
+ |objectClass: krb5principal
+ |objectClass: krb5kdcentry
+ |cn: $principal
+ |sn: $principal
+ |uid: $principal
+ |userPassword: $password
+ |krb5PrincipalName: ${principal}@${realm}
+ |krb5KeyVersionNumber: 0""".stripMargin
+ addEntriesToDirectoryService(ldifContent)
+ }
+
+ /**
+ * Creates multiple principals in the KDC and adds them to a keytab file.
+ *
+ * An exception will be thrown if the principal cannot be created.
+ *
+ * @param keytabFile keytab file to add the created principals
+ * @param principals principals to add to the KDC, do not include the domain.
+ */
+ def createPrincipal(keytabFile: File, principals: String*) {
+ val generatedPassword = UUID.randomUUID.toString
+ val keytab = new Keytab
+ val entries = principals.flatMap { principal =>
+ createPrincipal(principal, generatedPassword)
+ val principalWithRealm = s"${principal}@${realm}"
+ val timestamp = new KerberosTime
+ KerberosKeyFactory.getKerberosKeys(principalWithRealm, generatedPassword).asScala.values.map { encryptionKey =>
+ val keyVersion = encryptionKey.getKeyVersion.toByte
+ new KeytabEntry(principalWithRealm, 1, timestamp, keyVersion, encryptionKey)
+ }
+ }
+ keytab.setEntries(entries.asJava)
+ keytab.write(keytabFile)
+ }
+
+ private def addEntriesToDirectoryService(ldifContent: String): Unit = {
+ val reader = new LdifReader(new StringReader(ldifContent))
+ try {
+ for (ldifEntry <- reader.asScala)
+ ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry))
+ } finally CoreUtils.swallow(reader.close())
+ }
+
+}
+
+object MiniKdc {
+
+ val JavaSecurityKrb5Conf = "java.security.krb5.conf"
+ val SunSecurityKrb5Debug = "sun.security.krb5.debug"
+
+ def main(args: Array[String]) {
+ args match {
+ case Array(workDirPath, configPath, keytabPath, principals@ _*) if principals.nonEmpty =>
+ val workDir = new File(workDirPath)
+ if (!workDir.exists)
+ throw new RuntimeException(s"Specified work directory does not exist: ${workDir.getAbsolutePath}")
+ val config = createConfig
+ val configFile = new File(configPath)
+ if (!configFile.exists)
+ throw new RuntimeException(s"Specified configuration does not exist: ${configFile.getAbsolutePath}")
+
+ val userConfig = Utils.loadProps(configFile.getAbsolutePath)
+ userConfig.asScala.foreach { case (key, value) =>
+ config.put(key, value)
+ }
+ val keytabFile = new File(keytabPath).getAbsoluteFile
+ start(workDir, config, keytabFile, principals)
+ case _ =>
+ println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> <KEYTABFILE> [<PRINCIPALS>]+")
+ sys.exit(1)
+ }
+ }
+
+ private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]) {
+ val miniKdc = new MiniKdc(config, workDir)
+ miniKdc.start()
+ miniKdc.createPrincipal(keytabFile, principals: _*)
+ val infoMessage = s"""
+ |
+ |Standalone MiniKdc Running
+ |---------------------------------------------------
+ | Realm : ${miniKdc.realm}
+ | Running at : ${miniKdc.host}:${miniKdc.port}
+ | krb5conf : ${miniKdc.krb5conf}
+ |
+ | created keytab : $keytabFile
+ | with principals : ${principals.mkString(", ")}
+ |
+ |Hit <CTRL-C> or kill <PID> to stop it
+ |---------------------------------------------------
+ |
+ """.stripMargin
+ println(infoMessage)
+ Runtime.getRuntime.addShutdownHook(CoreUtils.newThread("minikdc-shutdown-hook", daemon = false) {
+ miniKdc.stop()
+ })
+ }
+
+ val OrgName = "org.name"
+ val OrgDomain = "org.domain"
+ val KdcBindAddress = "kdc.bind.address"
+ val KdcPort = "kdc.port"
+ val Instance = "instance"
+ val MaxTicketLifetime = "max.ticket.lifetime"
+ val MaxRenewableLifetime = "max.renewable.lifetime"
+ val Transport = "transport"
+ val Debug = "debug"
+
+ private val RequiredProperties = Set(OrgName, OrgDomain, KdcBindAddress, KdcPort, Instance, Transport,
+ MaxTicketLifetime, MaxRenewableLifetime)
+
+ private val DefaultConfig = Map(
+ KdcBindAddress -> "localhost",
+ KdcPort -> "0",
+ Instance -> "DefaultKrbServer",
+ OrgName -> "Example",
+ OrgDomain -> "COM",
+ Transport -> "TCP",
+ MaxTicketLifetime -> "86400000",
+ MaxRenewableLifetime -> "604800000",
+ Debug -> "false"
+ )
+
+ /**
+ * Convenience method that returns MiniKdc default configuration.
+ *
+ * The returned configuration is a copy, it can be customized before using
+ * it to create a MiniKdc.
+ */
+ def createConfig: Properties = {
+ val properties = new Properties
+ DefaultConfig.foreach { case (k, v) => properties.setProperty(k, v) }
+ properties
+ }
+
+ @throws[IOException]
+ def getResourceAsStream(resourceName: String): InputStream = {
+ val cl = Option(Thread.currentThread.getContextClassLoader).getOrElse(classOf[MiniKdc].getClassLoader)
+ Option(cl.getResourceAsStream(resourceName)).getOrElse {
+ throw new IOException(s"Can not read resource file `$resourceName`")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 676772f..2ca64f2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -77,17 +77,15 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
// The following method does nothing by default, but
// if the test case requires setting up a cluster ACL,
// then it needs to be implemented.
- setClusterAcl match {
- case Some(f) =>
- f()
- case None => // Nothing to do
- }
+ setClusterAcl.foreach(_.apply)
}
@After
override def tearDown() {
- servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ if (servers != null) {
+ servers.foreach(_.shutdown())
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ }
super.tearDown
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 47158d6..6ed317a 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -23,6 +23,8 @@ ext {
}
versions += [
+ apacheda: "1.0.0-M33",
+ apacheds: "2.0.0-M21",
argparse4j: "0.5.0",
bcpkix: "1.54",
hadoop: "2.7.2",
@@ -65,8 +67,16 @@ versions["baseScala"] = versions.scala.substring(0, versions.scala.lastIndexOf("
libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
+ apacheda: "org.apache.directory.api:api-all:$versions.apacheda",
+ apachedsCoreApi: "org.apache.directory.server:apacheds-core-api:$versions.apacheds",
+ apachedsInterceptorKerberos: "org.apache.directory.server:apacheds-interceptor-kerberos:$versions.apacheds",
+ apachedsProtocolShared: "org.apache.directory.server:apacheds-protocol-shared:$versions.apacheds",
+ apachedsProtocolKerberos: "org.apache.directory.server:apacheds-protocol-kerberos:$versions.apacheds",
+ apachedsProtocolLdap: "org.apache.directory.server:apacheds-protocol-ldap:$versions.apacheds",
+ apachedsLdifPartition: "org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds",
+ apachedsMavibotPartition: "org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
+ apachedsJdbmPartition: "org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
- hadoopMiniKdc: "org.apache.hadoop:hadoop-minikdc:$versions.hadoop",
easymock: "org.easymock:easymock:$versions.easymock",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
index 3b3a5f1..b376e26 100644
--- a/tests/kafkatest/services/security/minikdc.py
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -67,10 +67,9 @@ class MiniKdc(Service):
principals = 'client ' + kafka_principals + self.extra_principals
self.logger.info("Starting MiniKdc with principals " + principals)
- lib_dir = "/opt/%s/core/build/dependant-testlibs" % kafka_dir(node)
- kdc_jars = node.account.ssh_capture("ls " + lib_dir)
- classpath = ":".join([os.path.join(lib_dir, jar.strip()) for jar in kdc_jars])
- cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh org.apache.hadoop.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
+ jar_paths = self.core_jar_paths(node, "dependant-testlibs") + self.core_jar_paths(node, "libs")
+ classpath = ":".join(jar_paths)
+ cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh kafka.security.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor:
node.account.ssh(cmd)
@@ -82,6 +81,11 @@ class MiniKdc(Service):
#KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)
+ def core_jar_paths(self, node, lib_dir_name):
+ lib_dir = "/opt/%s/core/build/%s" % (kafka_dir(node), lib_dir_name)
+ jars = node.account.ssh_capture("ls " + lib_dir)
+ return [os.path.join(lib_dir, jar.strip()) for jar in jars]
+
def stop_node(self, node):
self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname))
node.account.kill_process("apacheds", allow_fail=False)