You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:31 UTC

[15/50] [abbrv] kafka git commit: KAFKA-2844; Separate keytabs for sasl tests

KAFKA-2844; Separate keytabs for sasl tests

Use a different keytab for server and client in SASL tests

Also:
* Improve approach used to build the JAAS files programmatically
* Delete stale `kafka_jaas.conf` file
* Move `FourLetterWords` to its own file, add `Zk` prefix and clean-up its usage

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Harsha Chintalapani, Gwen Shapira

Closes #533 from ijuma/separate-keytabs-for-sasl-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89fd97f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89fd97f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89fd97f8

Branch: refs/heads/0.10.0
Commit: 89fd97f8c934b99b1ad994e36435e0686ef7b85a
Parents: a4d6f1d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Apr 1 15:25:35 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 1 15:25:35 2016 -0700

----------------------------------------------------------------------
 core/src/test/resources/kafka_jaas.conf         |  29 ----
 .../scala/integration/kafka/api/SaslSetup.scala |  39 ++---
 .../security/auth/ZkAuthorizationTest.scala     |  47 ++----
 .../scala/unit/kafka/utils/JaasTestUtils.scala  | 156 ++++++++++++-------
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  66 ++++----
 .../scala/unit/kafka/zk/ZkFourLetterWords.scala |  47 ++++++
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  48 ++----
 7 files changed, 217 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/resources/kafka_jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf
deleted file mode 100644
index b097e26..0000000
--- a/core/src/test/resources/kafka_jaas.conf
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-KafkaClient {
-	com.sun.security.auth.module.Krb5LoginModule required debug=true
-	useKeyTab=true
-	storeKey=true
-	serviceName="kafka"
-	keyTab="$keytab-location"
-	principal="client@EXAMPLE.COM";
-};
-
-KafkaServer {
-	com.sun.security.auth.module.Krb5LoginModule required debug=true
-	useKeyTab=true
-	storeKey=true
-	serviceName="kafka"
-	keyTab="$keytab-location"
-	principal="kafka/localhost@EXAMPLE.COM";
-};

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 8255e6a..967cae1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -35,8 +35,7 @@ case object KafkaSasl extends SaslSetupMode
 case object Both extends SaslSetupMode
 
 /*
- * Trait used in SaslTestHarness and EndToEndAuthorizationTest
- * currently to setup a keytab and jaas files.
+ * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
  */
 trait SaslSetup {
   private val workDir = TestUtils.tempDir()
@@ -46,34 +45,26 @@ trait SaslSetup {
   def startSasl(mode: SaslSetupMode = Both) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
-    val keytabFile = createKeytabAndSetConfiguration(mode)
+    val (serverKeytabFile, clientKeytabFile) = createKeytabsAndSetConfiguration(mode)
     kdc.start()
-    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
+    kdc.createPrincipal(clientKeytabFile, "client")
     if (mode == Both || mode == ZkSasl)
       System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
-  protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = {
-    val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode)
+  protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, File) = {
+    val serverKeytabFile = TestUtils.tempFile()
+    val clientKeytabFile = TestUtils.tempFile()
+    val jaasFile = mode match {
+      case ZkSasl => JaasTestUtils.writeZkFile()
+      case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, clientKeytabFile)
+      case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, clientKeytabFile)
+    }
     // This will cause a reload of the Configuration singleton when `getConfiguration` is called
     Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
-    keytabFile
-  }
-
-  private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = {
-    val keytabFile = TestUtils.tempFile()
-    val jaasFileName: String = mode match {
-      case ZkSasl =>
-        JaasTestUtils.genZkFile
-      case KafkaSasl =>
-        JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath)
-      case _ =>
-        JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath)
-    }
-    val jaasFile = new File(jaasFileName)
-
-    (keytabFile, jaasFile)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+    (serverKeytabFile, clientKeytabFile)
   }
 
   def closeSasl() {
@@ -81,7 +72,7 @@ trait SaslSetup {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    System.clearProperty("zookeeper.authProvider.1");
+    System.clearProperty("zookeeper.authProvider.1")
     Configuration.setConfiguration(null)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 6a533b3..ab5324c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -22,17 +22,17 @@ import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.data.{ACL}
 import org.junit.Assert._
-import org.junit.{After, Before, BeforeClass, Test}
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
 import scala.util.{Try, Success, Failure}
 import javax.security.auth.login.Configuration
 
+class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
+  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile
+  val authProvider = "zookeeper.authProvider.1"
 
-class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
-  val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
-  val authProvider: String = "zookeeper.authProvider.1"
   @Before
   override def setUp() {
     Configuration.setConfiguration(null)
@@ -65,12 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       JaasUtils.isZkSecurityEnabled()
       fail("Should have thrown an exception")
     } catch {
-      case e: KafkaException => {
-        // Expected
-      }
-      case e: Exception => {
-        fail(e.toString)
-      }
+      case e: KafkaException => // Expected
     }
   }
 
@@ -241,10 +236,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       case false => list.size == 1
     } 
     isListSizeCorrect && list.asScala.forall(
-        secure match {
-          case true => isAclSecure
-          case false => isAclUnsecure
-        })
+      secure match {
+        case true => isAclSecure
+        case false => isAclUnsecure
+      })
   }
   
   /**
@@ -255,15 +250,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   private def isAclSecure(acl: ACL): Boolean = {
     info(s"ACL $acl")
     acl.getPerms match {
-      case 1 => {
-        acl.getId.getScheme.equals("world")
-      }
-      case 31 => {
-        acl.getId.getScheme.equals("sasl")
-      }
-      case _: Int => {
-        false
-      }
+      case 1 => acl.getId.getScheme.equals("world")
+      case 31 => acl.getId.getScheme.equals("sasl")
+      case _ => false
     }
   }
   
@@ -273,12 +262,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   private def isAclUnsecure(acl: ACL): Boolean = {
     info(s"ACL $acl")
     acl.getPerms match {
-      case 31 => {
-        acl.getId.getScheme.equals("world")
-      }
-      case _: Int => {
-        false
-      }
+      case 31 => acl.getId.getScheme.equals("world")
+      case _ => false
     }
   }
   
@@ -323,7 +308,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       case "/" => result
       // For all other paths, try to delete it
       case path =>
-        try{
+        try {
           zkUtils.deletePath(path)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index cf08830..a14cd3f 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -16,72 +16,110 @@
  */
 package kafka.utils
 
+import java.io.{File, BufferedWriter, FileWriter}
 
 object JaasTestUtils {
-  // ZooKeeper vals
-  val zkServerContextName = "Server"
-  val zkClientContextName = "Client"
-  val userSuperPasswd = "adminpasswd"
-  val user = "fpj"
-  val userPasswd = "fpjsecret"
-  val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
-  //Kafka vals
-  val kafkaServerContextName = "KafkaServer"
-  val kafkaClientContextName = "KafkaClient"
-  val kafkaServerPrincipal = "client@EXAMPLE.COM"
-  val kafkaClientPrincipal = "kafka/localhost@EXAMPLE.COM"
-  val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule"
-  
-  def genZkFile: String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeZkToOutputStream(jaasOutputStream)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  case class Krb5LoginModule(contextName: String,
+                             useKeyTab: Boolean,
+                             storeKey: Boolean,
+                             keyTab: String,
+                             principal: String,
+                             debug: Boolean,
+                             serviceName: Option[String]) {
+    def toJaasSection: JaasSection = {
+      JaasSection(
+        contextName,
+        "com.sun.security.auth.module.Krb5LoginModule",
+        debug = debug,
+        entries = Map(
+          "useKeyTab" -> useKeyTab.toString,
+          "storeKey" -> storeKey.toString,
+          "keyTab" -> keyTab,
+          "principal" -> principal
+        ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
+      )
+    }
+  }
+
+  case class JaasSection(contextName: String,
+                         moduleName: String,
+                         debug: Boolean,
+                         entries: Map[String, String]) {
+    override def toString: String = {
+      s"""|$contextName {
+          |  $moduleName required
+          |  debug=$debug
+          |  ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n|  ", ";")}
+          |};
+          |""".stripMargin
+    }
+  }
+
+  private val ZkServerContextName = "Server"
+  private val ZkClientContextName = "Client"
+  private val ZkUserSuperPasswd = "adminpasswd"
+  private val ZkUser = "fpj"
+  private val ZkUserPassword = "fpjsecret"
+  private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
+
+  private val KafkaServerContextName = "KafkaServer"
+  private val KafkaServerPrincipal = "kafka/localhost@EXAMPLE.COM"
+  private val KafkaClientContextName = "KafkaClient"
+  private val KafkaClientPrincipal = "client@EXAMPLE.COM"
+
+  def writeZkFile(): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, zkSections)
     jaasFile.getCanonicalPath
   }
-  
-  def genKafkaFile(keytabLocation: String): String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation))
     jaasFile.getCanonicalPath
   }
-  
-  def genZkAndKafkaFile(keytabLocation: String): String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
-    jaasOutputStream.write("\n\n".getBytes)
-    writeZkToOutputStream(jaasOutputStream)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation) ++ zkSections)
     jaasFile.getCanonicalPath
   }
-  
-  private def writeZkToOutputStream(jaasOutputStream: java.io.FileOutputStream) {
-    jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule required\n".getBytes)
-    jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes)
-    jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes)
-    jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule required\n""".getBytes)
-    jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes)
-    jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes)
+
+  private def zkSections: Seq[JaasSection] = Seq(
+    JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)),
+    JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))
+  )
+
+  private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: File): Seq[JaasSection] = {
+    Seq(
+      Krb5LoginModule(
+        KafkaServerContextName,
+        useKeyTab = true,
+        storeKey = true,
+        keyTab = serverKeytabLocation.getAbsolutePath,
+        principal = KafkaServerPrincipal,
+        debug = true,
+        serviceName = Some("kafka")),
+      Krb5LoginModule(
+        KafkaClientContextName,
+        useKeyTab = true,
+        storeKey = true,
+        keyTab = clientKeytabLocation.getAbsolutePath,
+        principal = KafkaClientPrincipal,
+        debug = true,
+        serviceName = Some("kafka")
+      )
+    ).map(_.toJaasSection)
   }
-  
-  private def writeKafkaToOutputStream(jaasOutputStream: java.io.FileOutputStream, keytabLocation: String) {
-    jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule required debug=true\n".getBytes)
-    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
-    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
-    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
-    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
-    jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes)
-    jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule required debug=true\n""".getBytes)
-    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
-    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
-    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
-    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
-    jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes)
+
+  private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =
+    jaasSections.mkString
+
+  private def writeToFile(file: File, jaasSections: Seq[JaasSection]) {
+    val writer = new BufferedWriter(new FileWriter(file))
+    try writer.write(jaasSectionsToString(jaasSections))
+    finally writer.close()
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 32c7a5d..c2c25ed 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -17,10 +17,11 @@
 
 package kafka.zk
 
-import java.util.ArrayList
-import java.util.Collection
+import java.lang.Iterable
 import javax.security.auth.login.Configuration
 
+import scala.collection.JavaConverters._
+
 import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
 import kafka.utils.ZKCheckedEphemeral
@@ -30,26 +31,24 @@ import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher
 import org.apache.zookeeper.ZooDefs.Ids
-import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.junit.{After, Before, Test, Assert}
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.junit.runner.RunWith
 
 object ZKEphemeralTest {
+
   @Parameters
-  def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = {
-    val list = new ArrayList[Array[java.lang.Boolean]]()
-    list.add(Array(true))
-    list.add(Array(false))
-    list
-  }
+  def enableSecurityOptions: Iterable[Array[java.lang.Boolean]] =
+    Seq[Array[java.lang.Boolean]](Array(true), Array(false)).asJava
+
 }
 
 @RunWith(value = classOf[Parameterized])
 class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
-  val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
-  val authProvider: String = "zookeeper.authProvider.1"
+  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile()
+  val authProvider = "zookeeper.authProvider.1"
   var zkSessionTimeoutMs = 1000
   
   @Before
@@ -103,17 +102,14 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
    */
   @Test
   def testZkWatchedEphemeral = {
-    var path = "/zwe-test"
-    testCreation(path)
-    path = "/zwe-test-parent/zwe-test"
-    testCreation(path)
+    testCreation("/zwe-test")
+    testCreation("/zwe-test-parent/zwe-test")
   }
  
   private def testCreation(path: String) {
     val zk = zkUtils.zkConnection.getZookeeper
     val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     var created = false
-    var counter = 10
 
     zk.exists(path, new Watcher() {
       def process(event: WatchedEvent) {
@@ -140,19 +136,19 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     //Creates a second session
     val (zkClient2, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
     val zk2 = zkConnection2.getZookeeper
-    var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
+    val zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
 
     // Creates znode for path in the first session
     zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
     //Bootstraps the ZKWatchedEphemeral object
-    var gotException = false;
-    try {
-      zwe.create()
-    } catch {
-      case e: ZkNodeExistsException =>
-        gotException = true
-    }
+    val gotException =
+      try {
+        zwe.create()
+        false
+      } catch {
+        case e: ZkNodeExistsException => true
+      }
     Assert.assertTrue(gotException)
     zkClient2.close()
   }
@@ -168,15 +164,15 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     // Creates znode for path in the first session
     zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
-    var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
+    val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     //Bootstraps the ZKWatchedEphemeral object
-    var gotException = false;
-    try {
-      zwe.create()
-    } catch {
-      case e: ZkNodeExistsException =>
-        gotException = true
-    }
+    val gotException =
+      try {
+        zwe.create()
+        false
+      } catch {
+        case e: ZkNodeExistsException => true
+      }
     Assert.assertFalse(gotException)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
new file mode 100644
index 0000000..6eaee70
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -0,0 +1,47 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.zk
+
+import java.io.IOException
+import java.net.{SocketTimeoutException, Socket, InetAddress, InetSocketAddress}
+
+/**
+  * ZooKeeper responds to a small set of commands. Each command is composed of four letters. You issue the commands to
+  * ZooKeeper via telnet or nc, at the client port.
+  *
+  * Three of the more interesting commands: "stat" gives some general information about the server and connected
+  * clients, while "srvr" and "cons" give extended details on server and connections respectively.
+  */
+object ZkFourLetterWords {
+  def sendStat(host: String, port: Int, timeout: Int) {
+    val hostAddress =
+      if (host != null) new InetSocketAddress(host, port)
+      else new InetSocketAddress(InetAddress.getByName(null), port)
+    val sock = new Socket()
+    try {
+      sock.connect(hostAddress, timeout)
+      val outStream = sock.getOutputStream
+      outStream.write("stat".getBytes)
+      outStream.flush()
+    } catch {
+      case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw")
+    } finally {
+      sock.close
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index d618ba6..95f4e35 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,40 +17,12 @@
 
 package kafka.zk
 
-import java.io._
-import java.net._
 import javax.security.auth.login.Configuration
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import kafka.utils.{ZkUtils, Logging, CoreUtils}
 import org.junit.{After, Before}
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 
-object FourLetterWords {
-  def sendStat(host: String, port: Int, timeout: Int) {
-    val hostAddress = if (host != null)
-      new InetSocketAddress(host, port)
-    else
-      new InetSocketAddress(InetAddress.getByName(null), port)
-    val sock = new Socket()
-    var reader: BufferedReader = null
-    sock.connect(hostAddress, timeout)
-    try {
-      val outstream = sock.getOutputStream
-      outstream.write("stat".getBytes)
-      outstream.flush
-    } catch {
-      case e: SocketTimeoutException => {
-        throw new IOException("Exception while sending 4lw")
-      }
-    } finally {
-      sock.close
-      if (reader != null)
-        reader.close
-    }
-  }
-}
-
 trait ZooKeeperTestHarness extends JUnitSuite with Logging {
   var zookeeper: EmbeddedZookeeper = null
   var zkPort: Int = -1
@@ -73,18 +45,20 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
      CoreUtils.swallow(zkUtils.close())
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
-      
-    var isDown = false  
-    while(!isDown) {
+
+    def isDown(): Boolean = {
       try {
-        FourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
-      } catch {
-        case _: Throwable => {
-          info("Server is down")
-          isDown = true
-        }
+        ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
+        false
+      } catch { case _: Throwable =>
+        debug("Server is down")
+        true
       }
     }
+
+    Iterator.continually(isDown()).exists(identity)
+
     Configuration.setConfiguration(null)
   }
+
 }