You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:31 UTC
[15/50] [abbrv] kafka git commit: KAFKA-2844;
Separate keytabs for sasl tests
KAFKA-2844; Separate keytabs for sasl tests
Use a different keytab for server and client in SASL tests
Also:
* Improve approach used to build the JAAS files programmatically
* Delete stale `kafka_jaas.conf` file
* Move `FourLetterWords` to its own file, add `Zk` prefix and clean-up its usage
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Harsha Chintalapani, Gwen Shapira
Closes #533 from ijuma/separate-keytabs-for-sasl-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89fd97f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89fd97f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89fd97f8
Branch: refs/heads/0.10.0
Commit: 89fd97f8c934b99b1ad994e36435e0686ef7b85a
Parents: a4d6f1d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Apr 1 15:25:35 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Apr 1 15:25:35 2016 -0700
----------------------------------------------------------------------
core/src/test/resources/kafka_jaas.conf | 29 ----
.../scala/integration/kafka/api/SaslSetup.scala | 39 ++---
.../security/auth/ZkAuthorizationTest.scala | 47 ++----
.../scala/unit/kafka/utils/JaasTestUtils.scala | 156 ++++++++++++-------
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 66 ++++----
.../scala/unit/kafka/zk/ZkFourLetterWords.scala | 47 ++++++
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 48 ++----
7 files changed, 217 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/resources/kafka_jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf
deleted file mode 100644
index b097e26..0000000
--- a/core/src/test/resources/kafka_jaas.conf
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required debug=true
- useKeyTab=true
- storeKey=true
- serviceName="kafka"
- keyTab="$keytab-location"
- principal="client@EXAMPLE.COM";
-};
-
-KafkaServer {
- com.sun.security.auth.module.Krb5LoginModule required debug=true
- useKeyTab=true
- storeKey=true
- serviceName="kafka"
- keyTab="$keytab-location"
- principal="kafka/localhost@EXAMPLE.COM";
-};
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 8255e6a..967cae1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -35,8 +35,7 @@ case object KafkaSasl extends SaslSetupMode
case object Both extends SaslSetupMode
/*
- * Trait used in SaslTestHarness and EndToEndAuthorizationTest
- * currently to setup a keytab and jaas files.
+ * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
*/
trait SaslSetup {
private val workDir = TestUtils.tempDir()
@@ -46,34 +45,26 @@ trait SaslSetup {
def startSasl(mode: SaslSetupMode = Both) {
// Important if tests leak consumers, producers or brokers
LoginManager.closeAll()
- val keytabFile = createKeytabAndSetConfiguration(mode)
+ val (serverKeytabFile, clientKeytabFile) = createKeytabsAndSetConfiguration(mode)
kdc.start()
- kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+ kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
+ kdc.createPrincipal(clientKeytabFile, "client")
if (mode == Both || mode == ZkSasl)
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}
- protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = {
- val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode)
+ protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, File) = {
+ val serverKeytabFile = TestUtils.tempFile()
+ val clientKeytabFile = TestUtils.tempFile()
+ val jaasFile = mode match {
+ case ZkSasl => JaasTestUtils.writeZkFile()
+ case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, clientKeytabFile)
+ case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, clientKeytabFile)
+ }
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null)
- System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
- keytabFile
- }
-
- private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = {
- val keytabFile = TestUtils.tempFile()
- val jaasFileName: String = mode match {
- case ZkSasl =>
- JaasTestUtils.genZkFile
- case KafkaSasl =>
- JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath)
- case _ =>
- JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath)
- }
- val jaasFile = new File(jaasFileName)
-
- (keytabFile, jaasFile)
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+ (serverKeytabFile, clientKeytabFile)
}
def closeSasl() {
@@ -81,7 +72,7 @@ trait SaslSetup {
// Important if tests leak consumers, producers or brokers
LoginManager.closeAll()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
- System.clearProperty("zookeeper.authProvider.1");
+ System.clearProperty("zookeeper.authProvider.1")
Configuration.setConfiguration(null)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 6a533b3..ab5324c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -22,17 +22,17 @@ import kafka.utils.{Logging, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.data.{ACL}
import org.junit.Assert._
-import org.junit.{After, Before, BeforeClass, Test}
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.util.{Try, Success, Failure}
import javax.security.auth.login.Configuration
+class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
+ val jaasFile = kafka.utils.JaasTestUtils.writeZkFile
+ val authProvider = "zookeeper.authProvider.1"
-class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
- val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
- val authProvider: String = "zookeeper.authProvider.1"
@Before
override def setUp() {
Configuration.setConfiguration(null)
@@ -65,12 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
JaasUtils.isZkSecurityEnabled()
fail("Should have thrown an exception")
} catch {
- case e: KafkaException => {
- // Expected
- }
- case e: Exception => {
- fail(e.toString)
- }
+ case e: KafkaException => // Expected
}
}
@@ -241,10 +236,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
case false => list.size == 1
}
isListSizeCorrect && list.asScala.forall(
- secure match {
- case true => isAclSecure
- case false => isAclUnsecure
- })
+ secure match {
+ case true => isAclSecure
+ case false => isAclUnsecure
+ })
}
/**
@@ -255,15 +250,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
private def isAclSecure(acl: ACL): Boolean = {
info(s"ACL $acl")
acl.getPerms match {
- case 1 => {
- acl.getId.getScheme.equals("world")
- }
- case 31 => {
- acl.getId.getScheme.equals("sasl")
- }
- case _: Int => {
- false
- }
+ case 1 => acl.getId.getScheme.equals("world")
+ case 31 => acl.getId.getScheme.equals("sasl")
+ case _ => false
}
}
@@ -273,12 +262,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
private def isAclUnsecure(acl: ACL): Boolean = {
info(s"ACL $acl")
acl.getPerms match {
- case 31 => {
- acl.getId.getScheme.equals("world")
- }
- case _: Int => {
- false
- }
+ case 31 => acl.getId.getScheme.equals("world")
+ case _ => false
}
}
@@ -323,7 +308,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
case "/" => result
// For all other paths, try to delete it
case path =>
- try{
+ try {
zkUtils.deletePath(path)
Failure(new Exception(s"Have been able to delete $path"))
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index cf08830..a14cd3f 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -16,72 +16,110 @@
*/
package kafka.utils
+import java.io.{File, BufferedWriter, FileWriter}
object JaasTestUtils {
- // ZooKeeper vals
- val zkServerContextName = "Server"
- val zkClientContextName = "Client"
- val userSuperPasswd = "adminpasswd"
- val user = "fpj"
- val userPasswd = "fpjsecret"
- val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
- //Kafka vals
- val kafkaServerContextName = "KafkaServer"
- val kafkaClientContextName = "KafkaClient"
- val kafkaServerPrincipal = "client@EXAMPLE.COM"
- val kafkaClientPrincipal = "kafka/localhost@EXAMPLE.COM"
- val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule"
-
- def genZkFile: String = {
- val jaasFile = java.io.File.createTempFile("jaas", ".conf")
- val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
- writeZkToOutputStream(jaasOutputStream)
- jaasOutputStream.close()
- jaasFile.deleteOnExit()
+
+ case class Krb5LoginModule(contextName: String,
+ useKeyTab: Boolean,
+ storeKey: Boolean,
+ keyTab: String,
+ principal: String,
+ debug: Boolean,
+ serviceName: Option[String]) {
+ def toJaasSection: JaasSection = {
+ JaasSection(
+ contextName,
+ "com.sun.security.auth.module.Krb5LoginModule",
+ debug = debug,
+ entries = Map(
+ "useKeyTab" -> useKeyTab.toString,
+ "storeKey" -> storeKey.toString,
+ "keyTab" -> keyTab,
+ "principal" -> principal
+ ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
+ )
+ }
+ }
+
+ case class JaasSection(contextName: String,
+ moduleName: String,
+ debug: Boolean,
+ entries: Map[String, String]) {
+ override def toString: String = {
+ s"""|$contextName {
+ | $moduleName required
+ | debug=$debug
+ | ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n| ", ";")}
+ |};
+ |""".stripMargin
+ }
+ }
+
+ private val ZkServerContextName = "Server"
+ private val ZkClientContextName = "Client"
+ private val ZkUserSuperPasswd = "adminpasswd"
+ private val ZkUser = "fpj"
+ private val ZkUserPassword = "fpjsecret"
+ private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
+
+ private val KafkaServerContextName = "KafkaServer"
+ private val KafkaServerPrincipal = "kafka/localhost@EXAMPLE.COM"
+ private val KafkaClientContextName = "KafkaClient"
+ private val KafkaClientPrincipal = "client@EXAMPLE.COM"
+
+ def writeZkFile(): String = {
+ val jaasFile = TestUtils.tempFile()
+ writeToFile(jaasFile, zkSections)
jaasFile.getCanonicalPath
}
-
- def genKafkaFile(keytabLocation: String): String = {
- val jaasFile = java.io.File.createTempFile("jaas", ".conf")
- val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
- writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
- jaasOutputStream.close()
- jaasFile.deleteOnExit()
+
+ def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+ val jaasFile = TestUtils.tempFile()
+ writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation))
jaasFile.getCanonicalPath
}
-
- def genZkAndKafkaFile(keytabLocation: String): String = {
- val jaasFile = java.io.File.createTempFile("jaas", ".conf")
- val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
- writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
- jaasOutputStream.write("\n\n".getBytes)
- writeZkToOutputStream(jaasOutputStream)
- jaasOutputStream.close()
- jaasFile.deleteOnExit()
+
+ def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+ val jaasFile = TestUtils.tempFile()
+ writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation) ++ zkSections)
jaasFile.getCanonicalPath
}
-
- private def writeZkToOutputStream(jaasOutputStream: java.io.FileOutputStream) {
- jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule required\n".getBytes)
- jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes)
- jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes)
- jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule required\n""".getBytes)
- jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes)
- jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes)
+
+ private def zkSections: Seq[JaasSection] = Seq(
+ JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)),
+ JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))
+ )
+
+ private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: File): Seq[JaasSection] = {
+ Seq(
+ Krb5LoginModule(
+ KafkaServerContextName,
+ useKeyTab = true,
+ storeKey = true,
+ keyTab = serverKeytabLocation.getAbsolutePath,
+ principal = KafkaServerPrincipal,
+ debug = true,
+ serviceName = Some("kafka")),
+ Krb5LoginModule(
+ KafkaClientContextName,
+ useKeyTab = true,
+ storeKey = true,
+ keyTab = clientKeytabLocation.getAbsolutePath,
+ principal = KafkaClientPrincipal,
+ debug = true,
+ serviceName = Some("kafka")
+ )
+ ).map(_.toJaasSection)
}
-
- private def writeKafkaToOutputStream(jaasOutputStream: java.io.FileOutputStream, keytabLocation: String) {
- jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule required debug=true\n".getBytes)
- jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
- jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
- jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
- jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
- jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes)
- jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule required debug=true\n""".getBytes)
- jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
- jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
- jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
- jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
- jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes)
+
+ private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =
+ jaasSections.mkString
+
+ private def writeToFile(file: File, jaasSections: Seq[JaasSection]) {
+ val writer = new BufferedWriter(new FileWriter(file))
+ try writer.write(jaasSectionsToString(jaasSections))
+ finally writer.close()
}
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 32c7a5d..c2c25ed 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -17,10 +17,11 @@
package kafka.zk
-import java.util.ArrayList
-import java.util.Collection
+import java.lang.Iterable
import javax.security.auth.login.Configuration
+import scala.collection.JavaConverters._
+
import kafka.consumer.ConsumerConfig
import kafka.utils.ZkUtils
import kafka.utils.ZKCheckedEphemeral
@@ -30,26 +31,24 @@ import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.ZooDefs.Ids
-import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.junit.{After, Before, Test, Assert}
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.junit.runner.RunWith
object ZKEphemeralTest {
+
@Parameters
- def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = {
- val list = new ArrayList[Array[java.lang.Boolean]]()
- list.add(Array(true))
- list.add(Array(false))
- list
- }
+ def enableSecurityOptions: Iterable[Array[java.lang.Boolean]] =
+ Seq[Array[java.lang.Boolean]](Array(true), Array(false)).asJava
+
}
@RunWith(value = classOf[Parameterized])
class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
- val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
- val authProvider: String = "zookeeper.authProvider.1"
+ val jaasFile = kafka.utils.JaasTestUtils.writeZkFile()
+ val authProvider = "zookeeper.authProvider.1"
var zkSessionTimeoutMs = 1000
@Before
@@ -103,17 +102,14 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
*/
@Test
def testZkWatchedEphemeral = {
- var path = "/zwe-test"
- testCreation(path)
- path = "/zwe-test-parent/zwe-test"
- testCreation(path)
+ testCreation("/zwe-test")
+ testCreation("/zwe-test-parent/zwe-test")
}
private def testCreation(path: String) {
val zk = zkUtils.zkConnection.getZookeeper
val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
var created = false
- var counter = 10
zk.exists(path, new Watcher() {
def process(event: WatchedEvent) {
@@ -140,19 +136,19 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
//Creates a second session
val (zkClient2, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
val zk2 = zkConnection2.getZookeeper
- var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
+ val zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
// Creates znode for path in the first session
zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
//Bootstraps the ZKWatchedEphemeral object
- var gotException = false;
- try {
- zwe.create()
- } catch {
- case e: ZkNodeExistsException =>
- gotException = true
- }
+ val gotException =
+ try {
+ zwe.create()
+ false
+ } catch {
+ case e: ZkNodeExistsException => true
+ }
Assert.assertTrue(gotException)
zkClient2.close()
}
@@ -168,15 +164,15 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
// Creates znode for path in the first session
zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
- var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
+ val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
//Bootstraps the ZKWatchedEphemeral object
- var gotException = false;
- try {
- zwe.create()
- } catch {
- case e: ZkNodeExistsException =>
- gotException = true
- }
+ val gotException =
+ try {
+ zwe.create()
+ false
+ } catch {
+ case e: ZkNodeExistsException => true
+ }
Assert.assertFalse(gotException)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
new file mode 100644
index 0000000..6eaee70
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import java.io.IOException
+import java.net.{SocketTimeoutException, Socket, InetAddress, InetSocketAddress}
+
+/**
+ * ZooKeeper responds to a small set of commands. Each command is composed of four letters. You issue the commands to
+ * ZooKeeper via telnet or nc, at the client port.
+ *
+ * Three of the more interesting commands: "stat" gives some general information about the server and connected
+ * clients, while "srvr" and "cons" give extended details on server and connections respectively.
+ */
+object ZkFourLetterWords {
+ def sendStat(host: String, port: Int, timeout: Int) {
+ val hostAddress =
+ if (host != null) new InetSocketAddress(host, port)
+ else new InetSocketAddress(InetAddress.getByName(null), port)
+ val sock = new Socket()
+ try {
+ sock.connect(hostAddress, timeout)
+ val outStream = sock.getOutputStream
+ outStream.write("stat".getBytes)
+ outStream.flush()
+ } catch {
+ case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw")
+ } finally {
+ sock.close
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89fd97f8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index d618ba6..95f4e35 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,40 +17,12 @@
package kafka.zk
-import java.io._
-import java.net._
import javax.security.auth.login.Configuration
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import kafka.utils.{ZkUtils, Logging, CoreUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
-object FourLetterWords {
- def sendStat(host: String, port: Int, timeout: Int) {
- val hostAddress = if (host != null)
- new InetSocketAddress(host, port)
- else
- new InetSocketAddress(InetAddress.getByName(null), port)
- val sock = new Socket()
- var reader: BufferedReader = null
- sock.connect(hostAddress, timeout)
- try {
- val outstream = sock.getOutputStream
- outstream.write("stat".getBytes)
- outstream.flush
- } catch {
- case e: SocketTimeoutException => {
- throw new IOException("Exception while sending 4lw")
- }
- } finally {
- sock.close
- if (reader != null)
- reader.close
- }
- }
-}
-
trait ZooKeeperTestHarness extends JUnitSuite with Logging {
var zookeeper: EmbeddedZookeeper = null
var zkPort: Int = -1
@@ -73,18 +45,20 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
CoreUtils.swallow(zkUtils.close())
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown())
-
- var isDown = false
- while(!isDown) {
+
+ def isDown(): Boolean = {
try {
- FourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
- } catch {
- case _: Throwable => {
- info("Server is down")
- isDown = true
- }
+ ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
+ false
+ } catch { case _: Throwable =>
+ debug("Server is down")
+ true
}
}
+
+ Iterator.continually(isDown()).exists(identity)
+
Configuration.setConfiguration(null)
}
+
}