You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/28 21:20:33 UTC

kafka git commit: KAFKA-2640; Add tests for ZK authentication

Repository: kafka
Updated Branches:
  refs/heads/trunk 8838fa801 -> 983b1f9e1


KAFKA-2640; Add tests for ZK authentication

I've added a couple of initial tests to verify the functionality. I've tested that the JAAS config file loads properly and SASL with DIGEST-MD5 works with ZooKeeper.

Author: Flavio Junqueira <fp...@apache.org>
Author: flavio junqueira <fp...@apache.org>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #324 from fpj/KAFKA-2640


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/983b1f9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/983b1f9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/983b1f9e

Branch: refs/heads/trunk
Commit: 983b1f9e17a0f392cfb7785f572c767540a12e2b
Parents: 8838fa8
Author: Flavio Junqueira <fp...@apache.org>
Authored: Wed Oct 28 13:20:26 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 28 13:20:26 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/security/JaasUtils.java |   3 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  14 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  17 +-
 core/src/test/resources/zk-digest-jaas.conf     |  11 +
 .../security/auth/ZkAuthorizationTest.scala     | 317 +++++++++++++++++++
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |   2 +
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  75 +++--
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  59 +++-
 8 files changed, 460 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index dade986..c081a76 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -87,7 +87,7 @@ public class JaasUtils {
 
     public static boolean isZkSecurityEnabled(String loginConfigFile) {
         boolean isSecurityEnabled = false;
-        boolean zkSaslEnabled = Boolean.getBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
+        boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
         String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
 
         if (loginConfigFile != null && loginConfigFile.length() > 0) {
@@ -95,6 +95,7 @@ public class JaasUtils {
             if (!configFile.canRead()) {
                 throw new KafkaException("File " + loginConfigFile + "cannot be read.");
             }
+                
             try {
                 URI configUri = configFile.toURI();
                 Configuration loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index fce5c03..e3ab7f2 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -68,7 +68,6 @@ object ZkSecurityMigrator extends Logging {
   def run(args: Array[String]) {
     var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     val parser = new OptionParser()
-
     val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
         + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
     val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String])
@@ -157,8 +156,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
       Code.get(rc) match {
         case Code.OK =>
           // Set ACL for each child
-          for (child <- children.asScala)
-            setAclsRecursively(s"$path/$child")
+          children.asScala.map { child =>
+            path match {
+              case "/" => s"/$child"
+              case path => s"$path/$child"
+            }
+          }.foreach(setAclsRecursively)
           promise success "done"
         case Code.CONNECTIONLOSS =>
           zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
@@ -211,9 +214,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
       for (path <- zkUtils.securePersistentZkPaths) {
         debug("Going to set ACL for %s".format(path))
         zkUtils.makeSurePersistentPathExists(path)
-        setAclsRecursively(path)
       }
-
+      setAclsRecursively("/")
+      
       @tailrec
       def recurse(): Unit = {
         val future = futures.synchronized { 
@@ -233,5 +236,4 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
       zkUtils.close
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a39e61c..30d45a3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -91,7 +91,8 @@ object ZkUtils {
   }
   
   def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
-    val list = ZooDefs.Ids.CREATOR_ALL_ACL
+    val list = new java.util.ArrayList[ACL]
+    list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
     list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
     list
   } else {
@@ -963,6 +964,9 @@ class ZKCheckedEphemeral(path: String,
         case Code.SESSIONEXPIRED =>
           error("Session has expired while creating %s".format(path))
           setResult(Code.SESSIONEXPIRED)
+        case Code.INVALIDACL =>
+          error("Invalid ACL")
+          setResult(Code.INVALIDACL)
         case _ =>
           warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
           setResult(Code.get(rc))
@@ -988,6 +992,9 @@ class ZKCheckedEphemeral(path: String,
           case Code.SESSIONEXPIRED =>
             error("Session has expired while reading znode %s".format(path))
             setResult(Code.SESSIONEXPIRED)
+          case Code.INVALIDACL =>
+            error("Invalid ACL")
+            setResult(Code.INVALIDACL)
           case _ =>
             warn("ZooKeeper event while getting znode data: %s %s".format(path, Code.get(rc)))
             setResult(Code.get(rc))
@@ -1011,7 +1018,7 @@ class ZKCheckedEphemeral(path: String,
     } else {
       zkHandle.create(prefix,
                       new Array[Byte](0),
-                      ZkUtils. DefaultAcls(isSecure),
+                      DefaultAcls(isSecure),
                       CreateMode.PERSISTENT,
                       new StringCallback() {
                         def processResult(rc : Int,
@@ -1031,6 +1038,9 @@ class ZKCheckedEphemeral(path: String,
                             case Code.SESSIONEXPIRED =>
                               error("Session has expired while creating %s".format(path))
                               setResult(Code.get(rc))
+                            case Code.INVALIDACL =>
+                              error("Invalid ACL")
+                              setResult(Code.INVALIDACL)
                             case _ =>
                               warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
                               setResult(Code.get(rc))
@@ -1068,7 +1078,8 @@ class ZKCheckedEphemeral(path: String,
     }
     val prefix = path.substring(0, index)
     val suffix = path.substring(index, path.length)
-    debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
+    debug(s"Path: $path, Prefix: $prefix, Suffix: $suffix")
+    info(s"Creating $path (is it secure? $isSecure)")
     createRecursive(prefix, suffix)
     val result = waitUntilResolved()
     info("Result of znode creation is: %s".format(result))

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/resources/zk-digest-jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/zk-digest-jaas.conf b/core/src/test/resources/zk-digest-jaas.conf
new file mode 100644
index 0000000..6020cd2
--- /dev/null
+++ b/core/src/test/resources/zk-digest-jaas.conf
@@ -0,0 +1,11 @@
+Server {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_super="adminsecret"
+       user_fpj="fpjsecret";
+};
+
+Client {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       username="fpj"
+       password="fpjsecret";
+};

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
new file mode 100644
index 0000000..5f3c4ce
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth
+
+import kafka.admin.ZkSecurityMigrator
+import kafka.utils.{Logging, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.junit.Assert._
+import org.junit.{After, Before, BeforeClass, Test}
+import scala.collection.JavaConverters._
+import scala.util.{Try, Success, Failure}
+
+
+class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
+  val jaasFile: String = "zk-digest-jaas.conf"
+  val authProvider: String = "zookeeper.authProvider.1"
+  @Before
+  override def setUp() {
+    val classLoader = getClass.getClassLoader
+    val filePath = classLoader.getResource(jaasFile).getPath
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, filePath)
+    System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
+    super.setUp()
+  }
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    System.clearProperty(authProvider)
+  }
+
+  /**
+   * Tests the method in JaasUtils that checks whether to use
+   * secure ACLs and authentication with ZooKeeper.
+   */
+  @Test
+  def testIsZkSecurityEnabled() {
+    assertTrue(JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+    assertFalse(JaasUtils.isZkSecurityEnabled(""))
+    try {
+      JaasUtils.isZkSecurityEnabled("no-such-file-exists.conf")
+      fail("Should have thrown an exception")
+    } catch {
+      case e: KafkaException => {
+        // Expected
+      }
+      case e: Exception => {
+        fail(e.toString)
+      }
+    }
+  }
+
+  /**
+   * Exercises the code in ZkUtils. The goal is mainly
+   * to verify that the behavior of ZkUtils is correct
+   * when isSecure is set to true.
+   */
+  @Test
+  def testZkUtils() {
+    assertTrue(zkUtils.isSecure)
+    for (path <- zkUtils.persistentZkPaths) {
+      zkUtils.makeSurePersistentPathExists(path)
+      if(!path.equals(ZkUtils.ConsumersPath)) {
+        val aclList = (zkUtils.zkConnection.getAcl(path)).getKey
+        assertTrue(aclList.size == 2)
+        for (acl: ACL <- aclList.asScala) {
+          assertTrue(isAclSecure(acl))
+        }
+      }
+    }
+    // Test that can create: createEphemeralPathExpectConflict
+    zkUtils.createEphemeralPathExpectConflict("/a", "")
+    verify("/a")
+    // Test that can create: createPersistentPath
+    zkUtils.createPersistentPath("/b")
+    verify("/b")
+    // Test that can create: createSequentialPersistentPath
+    val seqPath = zkUtils.createSequentialPersistentPath("/c", "")
+    verify(seqPath)
+    // Test that can update: updateEphemeralPath
+    zkUtils.updateEphemeralPath("/a", "updated")
+    val valueA: String = zkUtils.zkClient.readData("/a")
+    assertTrue(valueA.equals("updated"))
+    // Test that can update: updatePersistentPath
+    zkUtils.updatePersistentPath("/b", "updated")
+    val valueB: String = zkUtils.zkClient.readData("/b")
+    assertTrue(valueB.equals("updated"))
+
+    info("Leaving testZkUtils")
+  }
+
+  /**
+   * Tests the migration tool when making an unsecure
+   * cluster secure.
+   */
+  @Test
+  def testZkMigration() {
+    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) 
+    try {
+      testMigration(unsecureZkUtils, zkUtils)
+    } finally {
+      unsecureZkUtils.close()
+    }
+  }
+
+  /**
+   * Tests the migration tool when making a secure
+   * cluster unsecure.
+   */
+  @Test
+  def testZkAntiMigration() {
+    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+    try {
+      testMigration(zkUtils, unsecureZkUtils)
+    } finally {
+      unsecureZkUtils.close()
+    }
+  }
+
+  /**
+   * Tests that the persistent paths cannot be deleted.
+   */
+  @Test
+  def testDelete() {
+    info(s"zkConnect string: $zkConnect")
+    ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", s"--zookeeper.connect=$zkConnect"))
+    deleteAllUnsecure()
+  }
+
+  /**
+   * Tests that znodes cannot be deleted when the 
+   * persistent paths have children.
+   */
+  @Test
+  def testDeleteRecursive() {
+    info(s"zkConnect string: $zkConnect")
+    for (path <- zkUtils.securePersistentZkPaths) {
+      info(s"Creating $path")
+      zkUtils.makeSurePersistentPathExists(path)
+      zkUtils.createPersistentPath(s"$path/fpjwashere", "")
+    }
+    zkUtils.zkConnection.setAcl("/", zkUtils.DefaultAcls, -1)
+    deleteAllUnsecure()
+  }
+  
+  /**
+   * Tests the migration tool when chroot is being used.
+   */
+  @Test
+  def testChroot {
+    zkUtils.createPersistentPath("/kafka")
+    val unsecureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, false)
+    val secureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, true)
+    try {
+      testMigration(unsecureZkUtils, secureZkUtils)
+    } finally {
+      unsecureZkUtils.close()
+      secureZkUtils.close()
+    }
+  }
+
+  /**
+   * Exercises the migration tool. It is used by two test cases:
+   * testZkMigration and testZkAntiMigration.
+   */
+  private def testMigration(firstZk: ZkUtils, secondZk: ZkUtils) {
+    info(s"zkConnect string: $zkConnect")
+    for (path <- firstZk.securePersistentZkPaths) {
+      info(s"Creating $path")
+      firstZk.makeSurePersistentPathExists(path)
+      // Create a child for each znode to exercise the recurrent
+      // traversal of the data tree
+      firstZk.createPersistentPath(s"$path/fpjwashere", "")
+    }
+    val secureOpt: String  = secondZk.isSecure match {
+      case true => "secure"
+      case false => "unsecure"
+    }
+    ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkConnect"))
+    info("Done with migration")
+    for (path <- secondZk.securePersistentZkPaths) {
+      val listParent = (secondZk.zkConnection.getAcl(path)).getKey
+      assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
+
+      val childPath = path + "/fpjwashere"
+      val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey
+      assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
+    }
+  }
+
+  /**
+   * Verifies that the path has the appropriate secure ACL.
+   */
+  private def verify(path: String): Boolean = {
+    val list = (zkUtils.zkConnection.getAcl(path)).getKey
+    list.asScala.forall(isAclSecure)
+  }
+
+  /**
+   * Verifies ACL.
+   */
+  private def isAclCorrect(list: java.util.List[ACL], secure: Boolean): Boolean = {
+    val isListSizeCorrect = secure match {
+      case true => list.size == 2
+      case false => list.size == 1
+    } 
+    isListSizeCorrect && list.asScala.forall(
+        secure match {
+          case true => isAclSecure
+          case false => isAclUnsecure
+        })
+  }
+  
+  /**
+   * Verifies that this ACL is the secure one. The
+   * values are based on the constants used in the 
+   * ZooKeeper code base.
+   */
+  private def isAclSecure(acl: ACL): Boolean = {
+    info(s"ACL $acl")
+    acl.getPerms match {
+      case 1 => {
+        acl.getId.getScheme.equals("world")
+      }
+      case 31 => {
+        acl.getId.getScheme.equals("sasl")
+      }
+      case _: Int => {
+        false
+      }
+    }
+  }
+  
+  /**
+   * Verifies that the ACL corresponds to the unsecure one.
+   */
+  private def isAclUnsecure(acl: ACL): Boolean = {
+    info(s"ACL $acl")
+    acl.getPerms match {
+      case 31 => {
+        acl.getId.getScheme.equals("world")
+      }
+      case _: Int => {
+        false
+      }
+    }
+  }
+  
+  /**
+   * Sets up and starts the recursive execution of deletes.
+   * This is used in the testDelete and testDeleteRecursive
+   * test cases.
+   */
+  private def deleteAllUnsecure() {
+    System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
+    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+    val result: Try[Boolean] = {
+      deleteRecursive(unsecureZkUtils, "/")
+    }
+    // Clean up before leaving the test case
+    unsecureZkUtils.close()
+    System.clearProperty(JaasUtils.ZK_SASL_CLIENT)
+    
+    // Fail the test if able to delete
+    result match {
+      case Success(v) => // All done
+      case Failure(e) => fail(e.getMessage)
+    }
+  }
+  
+  /**
+   * Tries to delete znodes recursively
+   */
+  private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
+    info(s"Deleting $path")
+    var result: Try[Boolean] = Success(true)
+    for (child <- zkUtils.getChildren(path))
+      result = (path match {
+        case "/" => deleteRecursive(zkUtils, s"/$child")
+        case path => deleteRecursive(zkUtils, s"$path/$child")
+      }) match {
+        case Success(v) => result
+        case Failure(e) => Failure(e)
+      }
+    path match {
+      // Do not try to delete the root
+      case "/" => result
+      // For all other paths, try to delete it
+      case path =>
+        try{
+          zkUtils.deletePath(path)
+          Failure(new Exception(s"Have been able to delete $path"))
+        } catch {
+          case e: Exception => result
+        }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 2bca2cf..5fa2f65 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -21,7 +21,9 @@ import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxnFactory
 import kafka.utils.TestUtils
 import java.net.InetSocketAddress
+import javax.security.auth.login.Configuration
 import kafka.utils.CoreUtils
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils.getPort
 
 class EmbeddedZookeeper() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index eb08f58..cfcdc35 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -17,24 +17,66 @@
 
 package kafka.zk
 
+import java.util.ArrayList
+import java.util.Collection
+import javax.security.auth.login.Configuration
+
 import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
 import kafka.utils.ZKCheckedEphemeral
 import kafka.utils.TestUtils
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher
 import org.apache.zookeeper.ZooDefs.Ids
 import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
-import org.junit.{Test, Assert}
+import org.junit.{After, Before, Test, Assert}
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
 
-class ZKEphemeralTest extends ZooKeeperTestHarness {
-  var zkSessionTimeoutMs = 1000
+object ZKEphemeralTest {
+  @Parameters
+  def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = {
+    val list = new ArrayList[Array[java.lang.Boolean]]()
+    list.add(Array(true))
+    list.add(Array(false))
+    list
+  }
+}
 
+@RunWith(value = classOf[Parameterized])
+class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
+  val jaasFile: String = "zk-digest-jaas.conf"
+  val authProvider: String = "zookeeper.authProvider.1"
+  var zkSessionTimeoutMs = 1000
+  
+  @Before
+  override def setUp() {
+    if(secure) {
+      val classLoader = getClass.getClassLoader
+      val filePath = classLoader.getResource(jaasFile).getPath
+      System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, filePath)
+      System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
+      if(!JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) {
+        fail("Secure access not enabled")
+     }
+    }
+    super.setUp
+  }
+  
+  @After
+  override def tearDown() {
+    super.tearDown
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    System.clearProperty(authProvider)
+  }
+  
   @Test
   def testEphemeralNodeCleanup = {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
 
     try {
       zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
@@ -46,7 +88,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     testData = zkUtils.readData("/tmp/zktest")._1
     Assert.assertNotNull(testData)
     zkUtils.close
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
     val nodeExists = zkUtils.pathExists("/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }
@@ -54,28 +96,21 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
   /*****
    ***** Tests for ZkWatchedEphemeral
    *****/
-
+  
   /**
    * Tests basic creation
    */
   @Test
   def testZkWatchedEphemeral = {
-    val path = "/zwe-test"
+    var path = "/zwe-test"
     testCreation(path)
-  }
-
-  /**
-   * Tests recursive creation
-   */
-  @Test
-  def testZkWatchedEphemeralRecursive = {
-    val path = "/zwe-test-parent/zwe-test"
+    path = "/zwe-test-parent/zwe-test"
     testCreation(path)
   }
-
+ 
   private def testCreation(path: String) {
     val zk = zkUtils.zkConnection.getZookeeper
-    val zwe = new ZKCheckedEphemeral(path, "", zk, false)
+    val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
     var created = false
     var counter = 10
 
@@ -89,7 +124,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     zwe.create()
     // Waits until the znode is created
     TestUtils.waitUntilTrue(() => zkUtils.pathExists(path),
-                            "Znode %s wasn't created".format(path))
+                            s"Znode $path wasn't created")
   }
 
   /**
@@ -104,7 +139,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     //Creates a second session
     val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
     val zk2 = zkConnection2.getZookeeper
-    var zwe = new ZKCheckedEphemeral(path, "", zk2, false)
+    var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled(confFile))
 
     // Creates znode for path in the first session
     zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
@@ -131,7 +166,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     // Creates znode for path in the first session
     zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
-    var zwe = new ZKCheckedEphemeral(path, "", zk, false)
+    var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
     //Bootstraps the ZKWatchedEphemeral object
     var gotException = false;
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 38a0765..f567555 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,30 +17,73 @@
 
 package kafka.zk
 
+import java.io._
+import java.net._
+import javax.security.auth.login.Configuration
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
-import kafka.utils.{ZkUtils, CoreUtils}
+import kafka.utils.{ZkUtils, Logging, CoreUtils}
 import org.junit.{After, Before}
 import org.scalatest.junit.JUnitSuite
+import org.apache.kafka.common.security.JaasUtils
 
-trait ZooKeeperTestHarness extends JUnitSuite {
-  var zkPort: Int = -1
+object FourLetterWords {
+  def sendStat(host: String, port: Int, timeout: Int) {
+    val hostAddress = if (host != null)
+      new InetSocketAddress(host, port)
+    else
+      new InetSocketAddress(InetAddress.getByName(null), port)
+    val sock = new Socket()
+    var reader: BufferedReader = null
+    sock.connect(hostAddress, timeout)
+    try {
+      val outstream = sock.getOutputStream
+      outstream.write("stat".getBytes)
+      outstream.flush
+    } catch {
+      case e: SocketTimeoutException => {
+        throw new IOException("Exception while sending 4lw")
+      }
+    } finally {
+      sock.close
+      if (reader != null)
+        reader.close
+    }
+  }
+}
+
+trait ZooKeeperTestHarness extends JUnitSuite with Logging {
   var zookeeper: EmbeddedZookeeper = null
+  var zkPort: Int = -1
   var zkUtils: ZkUtils = null
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
   def zkConnect: String = "127.0.0.1:" + zkPort
-
+  def confFile: String = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")
+  
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkPort = zookeeper.port
-    zkUtils = ZkUtils.apply(zkConnect, zkSessionTimeout, zkConnectionTimeout, false)
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")))
   }
 
   @After
   def tearDown() {
-    CoreUtils.swallow(zkUtils.close())
-    CoreUtils.swallow(zookeeper.shutdown())
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close())
+    if (zookeeper != null)
+      CoreUtils.swallow(zookeeper.shutdown())
+      
+    var isDown = false  
+    while(!isDown) {
+      try {
+        FourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
+      } catch {
+        case _: Throwable => {
+          info("Server is down")
+          isDown = true
+        }
+      }
+    }
   }
-
 }