You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:24 UTC

[08/50] [abbrv] kafka git commit: KAFKA-3475; Introduce our own `MiniKdc`

KAFKA-3475; Introduce our own `MiniKdc`

This also fixes KAFKA-3453 and KAFKA-2866.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1155 from ijuma/kafka-3475-introduce-our-minikdc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78d91dcd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78d91dcd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78d91dcd

Branch: refs/heads/0.10.0
Commit: 78d91dcd8805d850038df52718380a6f956abad7
Parents: 2788f2d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Mar 30 19:30:34 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Mar 30 19:30:34 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |  15 +-
 .../org/apache/kafka/common/utils/Utils.java    |   2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  15 +-
 core/src/test/resources/minikdc-krb5.conf       |  25 ++
 core/src/test/resources/minikdc.ldiff           |  47 ++
 .../scala/integration/kafka/api/SaslSetup.scala |  10 +-
 .../scala/kafka/security/minikdc/MiniKdc.scala  | 433 +++++++++++++++++++
 .../integration/KafkaServerTestHarness.scala    |  12 +-
 gradle/dependencies.gradle                      |  12 +-
 tests/kafkatest/services/security/minikdc.py    |  12 +-
 10 files changed, 557 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 13a8b4e..d6f82a4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -338,7 +338,17 @@ project(':core') {
     testCompile project(':clients').sourceSets.test.output
     testCompile libs.bcpkix
     testCompile libs.easymock
-    testCompile libs.hadoopMiniKdc
+    testCompile(libs.apacheda) {
+      exclude group: 'xml-apis', module: 'xml-apis'
+    }
+    testCompile libs.apachedsCoreApi
+    testCompile libs.apachedsInterceptorKerberos
+    testCompile libs.apachedsProtocolShared
+    testCompile libs.apachedsProtocolKerberos
+    testCompile libs.apachedsProtocolLdap
+    testCompile libs.apachedsLdifPartition
+    testCompile libs.apachedsMavibotPartition
+    testCompile libs.apachedsJdbmPartition
     testCompile libs.junit
     testCompile libs.scalaTest
   }
@@ -368,6 +378,9 @@ project(':core') {
     duplicatesStrategy 'exclude'
   }
 
+  systemTestLibs {
+    dependsOn testJar
+  }
 
   task genProtocolErrorDocs(type: JavaExec) {
     classpath = sourceSets.main.runtimeClasspath

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0167548..2a98822 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -476,7 +476,7 @@ public class Utils {
      * @param daemon Should the thread block JVM shutdown?
      * @return The unstarted thread
      */
-    public static Thread newThread(String name, Runnable runnable, Boolean daemon) {
+    public static Thread newThread(String name, Runnable runnable, boolean daemon) {
         Thread thread = new Thread(runnable, name);
         thread.setDaemon(daemon);
         thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index fe2bebf..5b6c59f 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -57,13 +57,14 @@ object CoreUtils extends Logging {
     }
 
   /**
-   * Create a daemon thread
-   * @param name The name of the thread
-   * @param fun The function to execute in the thread
-   * @return The unstarted thread
-   */
-  def daemonThread(name: String, fun: => Unit): Thread =
-    Utils.daemonThread(name, runnable(fun))
+    * Create a thread
+    * @param name The name of the thread
+    * @param daemon Whether the thread should block JVM shutdown
+    * @param fun The function to execute in the thread
+    * @return The unstarted thread
+    */
+  def newThread(name: String, daemon: Boolean)(fun: => Unit): Thread =
+    Utils.newThread(name, runnable(fun), daemon)
 
   /**
    * Do the given action and log any exceptions thrown without rethrowing them

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/resources/minikdc-krb5.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/minikdc-krb5.conf b/core/src/test/resources/minikdc-krb5.conf
new file mode 100644
index 0000000..0603404
--- /dev/null
+++ b/core/src/test/resources/minikdc-krb5.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[libdefaults]
+default_realm = {0}
+udp_preference_limit = 1
+
+[realms]
+{0} = '{'
+  kdc = {1}:{2}
+'}'

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/resources/minikdc.ldiff
----------------------------------------------------------------------
diff --git a/core/src/test/resources/minikdc.ldiff b/core/src/test/resources/minikdc.ldiff
new file mode 100644
index 0000000..75e4dfd
--- /dev/null
+++ b/core/src/test/resources/minikdc.ldiff
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+dn: ou=users,dc=${0},dc=${1}
+objectClass: organizationalUnit
+objectClass: top
+ou: users
+
+dn: uid=krbtgt,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: KDC Service
+sn: Service
+uid: krbtgt
+userPassword: secret
+krb5PrincipalName: krbtgt/${2}.${3}@${2}.${3}
+krb5KeyVersionNumber: 0
+
+dn: uid=ldap,ou=users,dc=${0},dc=${1}
+objectClass: top
+objectClass: person
+objectClass: inetOrgPerson
+objectClass: krb5principal
+objectClass: krb5kdcentry
+cn: LDAP
+sn: Service
+uid: ldap
+userPassword: secret
+krb5PrincipalName: ldap/${4}@${2}.${3}
+krb5KeyVersionNumber: 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index c36b288..8255e6a 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -17,11 +17,11 @@
 
 package kafka.api
 
-import java.io.{File}
+import java.io.File
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{JaasTestUtils,TestUtils}
-import org.apache.hadoop.minikdc.MiniKdc
+import kafka.security.minikdc.MiniKdc
+import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.kerberos.LoginManager
 
@@ -39,8 +39,8 @@ case object Both extends SaslSetupMode
  * currently to setup a keytab and jaas files.
  */
 trait SaslSetup {
-  private val workDir = new File(System.getProperty("test.dir", "build/tmp/test-workDir"))
-  private val kdcConf = MiniKdc.createConf()
+  private val workDir = TestUtils.tempDir()
+  private val kdcConf = MiniKdc.createConfig
   private val kdc = new MiniKdc(kdcConf, workDir)
 
   def startSasl(mode: SaslSetupMode = Both) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
new file mode 100644
index 0000000..14807bc
--- /dev/null
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -0,0 +1,433 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.security.minikdc
+
+import java.io._
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.text.MessageFormat
+import java.util.{Locale, Properties, UUID}
+
+import kafka.utils.{CoreUtils, Logging}
+
+import scala.collection.JavaConverters._
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.text.StrSubstitutor
+import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry}
+import org.apache.directory.api.ldap.model.ldif.LdifReader
+import org.apache.directory.api.ldap.model.name.Dn
+import org.apache.directory.api.ldap.schema.extractor.impl.DefaultSchemaLdifExtractor
+import org.apache.directory.api.ldap.schema.loader.LdifSchemaLoader
+import org.apache.directory.api.ldap.schema.manager.impl.DefaultSchemaManager
+import org.apache.directory.server.constants.ServerDNConstants
+import org.apache.directory.server.core.DefaultDirectoryService
+import org.apache.directory.server.core.api.{CacheService, DirectoryService, InstanceLayout}
+import org.apache.directory.server.core.api.schema.SchemaPartition
+import org.apache.directory.server.core.kerberos.KeyDerivationInterceptor
+import org.apache.directory.server.core.partition.impl.btree.jdbm.{JdbmIndex, JdbmPartition}
+import org.apache.directory.server.core.partition.ldif.LdifPartition
+import org.apache.directory.server.kerberos.KerberosConfig
+import org.apache.directory.server.kerberos.kdc.KdcServer
+import org.apache.directory.server.kerberos.shared.crypto.encryption.KerberosKeyFactory
+import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
+import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport}
+import org.apache.directory.server.xdbm.Index
+import org.apache.directory.shared.kerberos.KerberosTime
+import org.apache.kafka.common.utils.Utils
+
+/**
+  * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone
+  * KDC.
+  *
+  * MiniKdc sets 2 System properties when started and unsets them when stopped:
+  *
+  * - java.security.krb5.conf: set to the MiniKDC real/host/port
+  * - sun.security.krb5.debug: set to the debug value provided in the configuration
+  *
+  * As a result of this, multiple MiniKdc instances should not be started concurrently in the same JVM.
+  *
+  * MiniKdc default configuration values are:
+  *
+  * - org.name=EXAMPLE (used to create the REALM)
+  * - org.domain=COM (used to create the REALM)
+  * - kdc.bind.address=localhost
+  * - kdc.port=0 (ephemeral port)
+  * - instance=DefaultKrbServer
+  * - max.ticket.lifetime=86400000 (1 day)
+  * - max.renewable.lifetime604800000 (7 days)
+  * - transport=TCP
+  * - debug=false
+  *
+  * The generated krb5.conf forces TCP connections.
+  *
+  * Acknowledgements: this class is derived from the MiniKdc class in the hadoop-minikdc project (git commit
+  * d8d8ed35f00b15ee0f2f8aaf3fe7f7b42141286b).
+  *
+  * @constructor creates a new MiniKdc instance.
+  * @param config the MiniKdc configuration
+  * @param workDir the working directory which will contain krb5.conf, Apache DS files and any other files needed by
+  *                MiniKdc.
+  * @throws Exception thrown if the MiniKdc could not be created.
+  */
+class MiniKdc(config: Properties, workDir: File) extends Logging {
+
+  if (!config.keySet.containsAll(MiniKdc.RequiredProperties.asJava)) {
+    val missingProperties = MiniKdc.RequiredProperties.filterNot(config.keySet.asScala)
+    throw new IllegalArgumentException(s"Missing configuration properties: $missingProperties")
+  }
+
+  info("Configuration:")
+  info("---------------------------------------------------------------")
+  config.asScala.foreach { case (key, value) =>
+    info(s"\t$key: $value")
+  }
+  info("---------------------------------------------------------------")
+
+  private val orgName = config.getProperty(MiniKdc.OrgName)
+  private val orgDomain = config.getProperty(MiniKdc.OrgDomain)
+  private val dnString = s"dc=$orgName,dc=$orgDomain"
+  private val realm = s"${orgName.toUpperCase(Locale.ENGLISH)}.${orgDomain.toUpperCase(Locale.ENGLISH)}"
+  private val krb5conf = new File(workDir, "krb5.conf")
+
+  private var _port = config.getProperty(MiniKdc.KdcPort).toInt
+  private var ds: DirectoryService = null
+  private var kdc: KdcServer = null
+
+  def port: Int = _port
+
+  def host: String = config.getProperty(MiniKdc.KdcBindAddress)
+
+  def start() {
+    if (kdc != null)
+      throw new RuntimeException("KDC already started")
+    initDirectoryService()
+    initKdcServer()
+    initJvmKerberosConfig()
+  }
+
+  private def initDirectoryService() {
+    ds = new DefaultDirectoryService
+    ds.setInstanceLayout(new InstanceLayout(workDir))
+    ds.setCacheService(new CacheService)
+
+    // first load the schema
+    val instanceLayout = ds.getInstanceLayout
+    val schemaPartitionDirectory = new File(instanceLayout.getPartitionsDirectory, "schema")
+    val extractor = new DefaultSchemaLdifExtractor(instanceLayout.getPartitionsDirectory)
+    extractor.extractOrCopy
+
+    val loader = new LdifSchemaLoader(schemaPartitionDirectory)
+    val schemaManager = new DefaultSchemaManager(loader)
+    schemaManager.loadAllEnabled()
+    ds.setSchemaManager(schemaManager)
+    // Init the LdifPartition with schema
+    val schemaLdifPartition = new LdifPartition(schemaManager, ds.getDnFactory)
+    schemaLdifPartition.setPartitionPath(schemaPartitionDirectory.toURI)
+
+    // The schema partition
+    val schemaPartition = new SchemaPartition(schemaManager)
+    schemaPartition.setWrappedPartition(schemaLdifPartition)
+    ds.setSchemaPartition(schemaPartition)
+
+    val systemPartition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
+    systemPartition.setId("system")
+    systemPartition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, systemPartition.getId).toURI)
+    systemPartition.setSuffixDn(new Dn(ServerDNConstants.SYSTEM_DN))
+    systemPartition.setSchemaManager(ds.getSchemaManager)
+    ds.setSystemPartition(systemPartition)
+
+    ds.getChangeLog.setEnabled(false)
+    ds.setDenormalizeOpAttrsEnabled(true)
+    ds.addLast(new KeyDerivationInterceptor)
+
+    // create one partition
+    val orgName = config.getProperty(MiniKdc.OrgName).toLowerCase(Locale.ENGLISH)
+    val orgDomain = config.getProperty(MiniKdc.OrgDomain).toLowerCase(Locale.ENGLISH)
+    val partition = new JdbmPartition(ds.getSchemaManager, ds.getDnFactory)
+    partition.setId(orgName)
+    partition.setPartitionPath(new File(ds.getInstanceLayout.getPartitionsDirectory, orgName).toURI)
+    val dn = new Dn(dnString)
+    partition.setSuffixDn(dn)
+    ds.addPartition(partition)
+
+    // indexes
+    val indexedAttributes = Set[Index[_, String]](
+      new JdbmIndex[Entry]("objectClass", false),
+      new JdbmIndex[Entry]("dc", false),
+      new JdbmIndex[Entry]("ou", false)
+    ).asJava
+    partition.setIndexedAttributes(indexedAttributes)
+
+    // And start the ds
+    ds.setInstanceId(config.getProperty(MiniKdc.Instance))
+    ds.startup()
+
+    // context entry, after ds.startup()
+    val entry = ds.newEntry(dn)
+    entry.add("objectClass", "top", "domain")
+    entry.add("dc", orgName)
+    ds.getAdminSession.add(entry)
+  }
+
+  private def initKdcServer() {
+
+    def addInitialEntriesToDirectoryService(bindAddress: String) {
+      val map = Map (
+        "0" -> orgName.toLowerCase(Locale.ENGLISH),
+        "1" -> orgDomain.toLowerCase(Locale.ENGLISH),
+        "2" -> orgName.toUpperCase(Locale.ENGLISH),
+        "3" -> orgDomain.toUpperCase(Locale.ENGLISH),
+        "4" -> bindAddress
+      )
+      val inputStream = MiniKdc.getResourceAsStream("minikdc.ldiff")
+      try addEntriesToDirectoryService(StrSubstitutor.replace(IOUtils.toString(inputStream), map.asJava))
+      finally CoreUtils.swallow(inputStream.close())
+    }
+
+    val bindAddress = config.getProperty(MiniKdc.KdcBindAddress)
+    addInitialEntriesToDirectoryService(bindAddress)
+
+    val kerberosConfig = new KerberosConfig
+    kerberosConfig.setMaximumRenewableLifetime(config.getProperty(MiniKdc.MaxRenewableLifetime).toLong)
+    kerberosConfig.setMaximumTicketLifetime(config.getProperty(MiniKdc.MaxTicketLifetime).toLong)
+    kerberosConfig.setSearchBaseDn(dnString)
+    kerberosConfig.setPaEncTimestampRequired(false)
+    kdc = new KdcServer(kerberosConfig)
+    kdc.setDirectoryService(ds)
+
+    // transport
+    val transport = config.getProperty(MiniKdc.Transport)
+    val absTransport = transport.trim match {
+      case "TCP" => new TcpTransport(bindAddress, port, 3, 50)
+      case "UDP" => new UdpTransport(port)
+      case _ => throw new IllegalArgumentException(s"Invalid transport: $transport")
+    }
+    kdc.addTransports(absTransport)
+    kdc.setServiceName(config.getProperty(MiniKdc.Instance))
+    kdc.start()
+
+    // if using ephemeral port, update port number for binding
+    if (port == 0)
+      _port = absTransport.getAcceptor.getLocalAddress.asInstanceOf[InetSocketAddress].getPort
+
+    info(s"MiniKdc listening at port: $port")
+  }
+
+  private def initJvmKerberosConfig(): Unit = {
+    writeKrb5Conf()
+    System.setProperty(MiniKdc.JavaSecurityKrb5Conf, krb5conf.getAbsolutePath)
+    System.setProperty(MiniKdc.SunSecurityKrb5Debug, config.getProperty(MiniKdc.Debug, "false"))
+    info(s"MiniKdc setting JVM krb5.conf to: ${krb5conf.getAbsolutePath}")
+    refreshJvmKerberosConfig()
+  }
+
+  private def writeKrb5Conf() {
+    val stringBuilder = new StringBuilder
+    val reader = new BufferedReader(
+      new InputStreamReader(MiniKdc.getResourceAsStream("minikdc-krb5.conf"), StandardCharsets.UTF_8))
+    try {
+      var line: String = null
+      while ({line = reader.readLine(); line != null}) {
+        stringBuilder.append(line).append("{3}")
+      }
+    } finally CoreUtils.swallow(reader.close())
+    val output = MessageFormat.format(stringBuilder.toString, realm, host, port.toString, System.lineSeparator())
+    Files.write(krb5conf.toPath, output.getBytes(StandardCharsets.UTF_8))
+  }
+
+  private def refreshJvmKerberosConfig(): Unit = {
+    val klass =
+      if (System.getProperty("java.vendor").contains("IBM"))
+        Class.forName("com.ibm.security.krb5.internal.Config")
+      else
+        Class.forName("sun.security.krb5.Config")
+    klass.getMethod("refresh").invoke(klass)
+  }
+
+  def stop() {
+    if (kdc != null) {
+      System.clearProperty(MiniKdc.JavaSecurityKrb5Conf)
+      System.clearProperty(MiniKdc.SunSecurityKrb5Debug)
+      kdc.stop()
+      try ds.shutdown()
+      catch {
+        case ex: Exception => error("Could not shutdown ApacheDS properly", ex)
+      }
+    }
+  }
+
+  /**
+    * Creates a principal in the KDC with the specified user and password.
+    *
+    * An exception will be thrown if the principal cannot be created.
+    *
+    * @param principal principal name, do not include the domain.
+    * @param password  password.
+    */
+  private def createPrincipal(principal: String, password: String) {
+    val ldifContent = s"""
+      |dn: uid=$principal,ou=users,dc=${orgName.toLowerCase(Locale.ENGLISH)},dc=${orgDomain.toLowerCase(Locale.ENGLISH)}
+      |objectClass: top
+      |objectClass: person
+      |objectClass: inetOrgPerson
+      |objectClass: krb5principal
+      |objectClass: krb5kdcentry
+      |cn: $principal
+      |sn: $principal
+      |uid: $principal
+      |userPassword: $password
+      |krb5PrincipalName: ${principal}@${realm}
+      |krb5KeyVersionNumber: 0""".stripMargin
+    addEntriesToDirectoryService(ldifContent)
+  }
+
+  /**
+    * Creates  multiple principals in the KDC and adds them to a keytab file.
+    *
+    * An exception will be thrown if the principal cannot be created.
+    *
+    * @param keytabFile keytab file to add the created principals
+    * @param principals principals to add to the KDC, do not include the domain.
+    */
+  def createPrincipal(keytabFile: File, principals: String*) {
+    val generatedPassword = UUID.randomUUID.toString
+    val keytab = new Keytab
+    val entries = principals.flatMap { principal =>
+      createPrincipal(principal, generatedPassword)
+      val principalWithRealm = s"${principal}@${realm}"
+      val timestamp = new KerberosTime
+      KerberosKeyFactory.getKerberosKeys(principalWithRealm, generatedPassword).asScala.values.map { encryptionKey =>
+        val keyVersion = encryptionKey.getKeyVersion.toByte
+        new KeytabEntry(principalWithRealm, 1, timestamp, keyVersion, encryptionKey)
+      }
+    }
+    keytab.setEntries(entries.asJava)
+    keytab.write(keytabFile)
+  }
+
+  private def addEntriesToDirectoryService(ldifContent: String): Unit = {
+    val reader = new LdifReader(new StringReader(ldifContent))
+    try {
+      for (ldifEntry <- reader.asScala)
+        ds.getAdminSession.add(new DefaultEntry(ds.getSchemaManager, ldifEntry.getEntry))
+    } finally CoreUtils.swallow(reader.close())
+  }
+
+}
+
+object MiniKdc {
+
+  val JavaSecurityKrb5Conf = "java.security.krb5.conf"
+  val SunSecurityKrb5Debug = "sun.security.krb5.debug"
+
+  def main(args: Array[String]) {
+    args match {
+      case Array(workDirPath, configPath, keytabPath, principals@ _*) if principals.nonEmpty =>
+        val workDir = new File(workDirPath)
+        if (!workDir.exists)
+          throw new RuntimeException(s"Specified work directory does not exist: ${workDir.getAbsolutePath}")
+        val config = createConfig
+        val configFile = new File(configPath)
+        if (!configFile.exists)
+          throw new RuntimeException(s"Specified configuration does not exist: ${configFile.getAbsolutePath}")
+
+        val userConfig = Utils.loadProps(configFile.getAbsolutePath)
+        userConfig.asScala.foreach { case (key, value) =>
+          config.put(key, value)
+        }
+        val keytabFile = new File(keytabPath).getAbsoluteFile
+        start(workDir, config, keytabFile, principals)
+      case _ =>
+        println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> <KEYTABFILE> [<PRINCIPALS>]+")
+        sys.exit(1)
+    }
+  }
+
+  private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]) {
+    val miniKdc = new MiniKdc(config, workDir)
+    miniKdc.start()
+    miniKdc.createPrincipal(keytabFile, principals: _*)
+    val infoMessage = s"""
+      |
+      |Standalone MiniKdc Running
+      |---------------------------------------------------
+      |  Realm           : ${miniKdc.realm}
+      |  Running at      : ${miniKdc.host}:${miniKdc.port}
+      |  krb5conf        : ${miniKdc.krb5conf}
+      |
+      |  created keytab  : $keytabFile
+      |  with principals : ${principals.mkString(", ")}
+      |
+      |Hit <CTRL-C> or kill <PID> to stop it
+      |---------------------------------------------------
+      |
+    """.stripMargin
+    println(infoMessage)
+    Runtime.getRuntime.addShutdownHook(CoreUtils.newThread("minikdc-shutdown-hook", daemon = false) {
+      miniKdc.stop()
+    })
+  }
+
+  val OrgName = "org.name"
+  val OrgDomain = "org.domain"
+  val KdcBindAddress = "kdc.bind.address"
+  val KdcPort = "kdc.port"
+  val Instance = "instance"
+  val MaxTicketLifetime = "max.ticket.lifetime"
+  val MaxRenewableLifetime = "max.renewable.lifetime"
+  val Transport = "transport"
+  val Debug = "debug"
+
+  private val RequiredProperties = Set(OrgName, OrgDomain, KdcBindAddress, KdcPort, Instance, Transport,
+    MaxTicketLifetime, MaxRenewableLifetime)
+
+  private val DefaultConfig = Map(
+    KdcBindAddress -> "localhost",
+    KdcPort -> "0",
+    Instance -> "DefaultKrbServer",
+    OrgName -> "Example",
+    OrgDomain -> "COM",
+    Transport -> "TCP",
+    MaxTicketLifetime -> "86400000",
+    MaxRenewableLifetime -> "604800000",
+    Debug -> "false"
+  )
+
+  /**
+    * Convenience method that returns MiniKdc default configuration.
+    *
+    * The returned configuration is a copy, it can be customized before using
+    * it to create a MiniKdc.
+    */
+  def createConfig: Properties = {
+    val properties = new Properties
+    DefaultConfig.foreach { case (k, v) => properties.setProperty(k, v) }
+    properties
+  }
+
+  @throws[IOException]
+  def getResourceAsStream(resourceName: String): InputStream = {
+    val cl = Option(Thread.currentThread.getContextClassLoader).getOrElse(classOf[MiniKdc].getClassLoader)
+    Option(cl.getResourceAsStream(resourceName)).getOrElse {
+      throw new IOException(s"Can not read resource file `$resourceName`")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 676772f..2ca64f2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -77,17 +77,15 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
     // The following method does nothing by default, but
     // if the test case requires setting up a cluster ACL,
     // then it needs to be implemented.
-    setClusterAcl match {
-      case Some(f) =>
-        f()
-      case None => // Nothing to do
-    }
+    setClusterAcl.foreach(_.apply)
   }
 
   @After
   override def tearDown() {
-    servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    if (servers != null) {
+      servers.foreach(_.shutdown())
+      servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    }
     super.tearDown
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 47158d6..6ed317a 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -23,6 +23,8 @@ ext {
 }
 
 versions += [
+  apacheda: "1.0.0-M33",
+  apacheds: "2.0.0-M21",
   argparse4j: "0.5.0",
   bcpkix: "1.54",
   hadoop: "2.7.2",
@@ -65,8 +67,16 @@ versions["baseScala"] = versions.scala.substring(0, versions.scala.lastIndexOf("
 
 libs += [
   argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
+  apacheda: "org.apache.directory.api:api-all:$versions.apacheda",
+  apachedsCoreApi: "org.apache.directory.server:apacheds-core-api:$versions.apacheds",
+  apachedsInterceptorKerberos: "org.apache.directory.server:apacheds-interceptor-kerberos:$versions.apacheds",
+  apachedsProtocolShared: "org.apache.directory.server:apacheds-protocol-shared:$versions.apacheds",
+  apachedsProtocolKerberos: "org.apache.directory.server:apacheds-protocol-kerberos:$versions.apacheds",
+  apachedsProtocolLdap: "org.apache.directory.server:apacheds-protocol-ldap:$versions.apacheds",
+  apachedsLdifPartition: "org.apache.directory.server:apacheds-ldif-partition:$versions.apacheds",
+  apachedsMavibotPartition: "org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
+  apachedsJdbmPartition: "org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
-  hadoopMiniKdc: "org.apache.hadoop:hadoop-minikdc:$versions.hadoop",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
   jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",

http://git-wip-us.apache.org/repos/asf/kafka/blob/78d91dcd/tests/kafkatest/services/security/minikdc.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/minikdc.py b/tests/kafkatest/services/security/minikdc.py
index 3b3a5f1..b376e26 100644
--- a/tests/kafkatest/services/security/minikdc.py
+++ b/tests/kafkatest/services/security/minikdc.py
@@ -67,10 +67,9 @@ class MiniKdc(Service):
         principals = 'client ' + kafka_principals + self.extra_principals
         self.logger.info("Starting MiniKdc with principals " + principals)
 
-        lib_dir = "/opt/%s/core/build/dependant-testlibs" % kafka_dir(node)
-        kdc_jars = node.account.ssh_capture("ls " + lib_dir)
-        classpath = ":".join([os.path.join(lib_dir, jar.strip()) for jar in kdc_jars])
-        cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh org.apache.hadoop.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
+        jar_paths = self.core_jar_paths(node, "dependant-testlibs") + self.core_jar_paths(node, "libs")
+        classpath = ":".join(jar_paths)
+        cmd = "CLASSPATH=%s /opt/%s/bin/kafka-run-class.sh kafka.security.minikdc.MiniKdc %s %s %s %s 1>> %s 2>> %s &" % (classpath, kafka_dir(node), MiniKdc.WORK_DIR, MiniKdc.PROPS_FILE, MiniKdc.KEYTAB_FILE, principals, MiniKdc.LOG_FILE, MiniKdc.LOG_FILE)
         self.logger.debug("Attempting to start MiniKdc on %s with command: %s" % (str(node.account), cmd))
         with node.account.monitor_log(MiniKdc.LOG_FILE) as monitor:
             node.account.ssh(cmd)
@@ -82,6 +81,11 @@ class MiniKdc(Service):
         #KDC is set to bind openly (via 0.0.0.0). Change krb5.conf to hold the specific KDC address
         self.replace_in_file(MiniKdc.LOCAL_KRB5CONF_FILE, '0.0.0.0', node.account.hostname)
 
+    def core_jar_paths(self, node, lib_dir_name):
+        lib_dir = "/opt/%s/core/build/%s" % (kafka_dir(node), lib_dir_name)
+        jars = node.account.ssh_capture("ls " + lib_dir)
+        return [os.path.join(lib_dir, jar.strip()) for jar in jars]
+
     def stop_node(self, node):
         self.logger.info("Stopping %s on %s" % (type(self).__name__, node.account.hostname))
         node.account.kill_process("apacheds", allow_fail=False)