You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/28 21:20:33 UTC
kafka git commit: KAFKA-2640; Add tests for ZK authentication
Repository: kafka
Updated Branches:
refs/heads/trunk 8838fa801 -> 983b1f9e1
KAFKA-2640; Add tests for ZK authentication
I've added a couple of initial tests to verify the functionality. I've tested that the JAAS config file loads properly and SASL with DIGEST-MD5 works with ZooKeeper.
Author: Flavio Junqueira <fp...@apache.org>
Author: flavio junqueira <fp...@apache.org>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #324 from fpj/KAFKA-2640
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/983b1f9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/983b1f9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/983b1f9e
Branch: refs/heads/trunk
Commit: 983b1f9e17a0f392cfb7785f572c767540a12e2b
Parents: 8838fa8
Author: Flavio Junqueira <fp...@apache.org>
Authored: Wed Oct 28 13:20:26 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 28 13:20:26 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/common/security/JaasUtils.java | 3 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 14 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 17 +-
core/src/test/resources/zk-digest-jaas.conf | 11 +
.../security/auth/ZkAuthorizationTest.scala | 317 +++++++++++++++++++
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 2 +
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 75 +++--
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 59 +++-
8 files changed, 460 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index dade986..c081a76 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -87,7 +87,7 @@ public class JaasUtils {
public static boolean isZkSecurityEnabled(String loginConfigFile) {
boolean isSecurityEnabled = false;
- boolean zkSaslEnabled = Boolean.getBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
+ boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
if (loginConfigFile != null && loginConfigFile.length() > 0) {
@@ -95,6 +95,7 @@ public class JaasUtils {
if (!configFile.canRead()) {
throw new KafkaException("File " + loginConfigFile + "cannot be read.");
}
+
try {
URI configUri = configFile.toURI();
Configuration loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index fce5c03..e3ab7f2 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -68,7 +68,6 @@ object ZkSecurityMigrator extends Logging {
def run(args: Array[String]) {
var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val parser = new OptionParser()
-
val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String])
@@ -157,8 +156,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
Code.get(rc) match {
case Code.OK =>
// Set ACL for each child
- for (child <- children.asScala)
- setAclsRecursively(s"$path/$child")
+ children.asScala.map { child =>
+ path match {
+ case "/" => s"/$child"
+ case path => s"$path/$child"
+ }
+ }.foreach(setAclsRecursively)
promise success "done"
case Code.CONNECTIONLOSS =>
zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
@@ -211,9 +214,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
for (path <- zkUtils.securePersistentZkPaths) {
debug("Going to set ACL for %s".format(path))
zkUtils.makeSurePersistentPathExists(path)
- setAclsRecursively(path)
}
-
+ setAclsRecursively("/")
+
@tailrec
def recurse(): Unit = {
val future = futures.synchronized {
@@ -233,5 +236,4 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
zkUtils.close
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a39e61c..30d45a3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -91,7 +91,8 @@ object ZkUtils {
}
def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
- val list = ZooDefs.Ids.CREATOR_ALL_ACL
+ val list = new java.util.ArrayList[ACL]
+ list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
list
} else {
@@ -963,6 +964,9 @@ class ZKCheckedEphemeral(path: String,
case Code.SESSIONEXPIRED =>
error("Session has expired while creating %s".format(path))
setResult(Code.SESSIONEXPIRED)
+ case Code.INVALIDACL =>
+ error("Invalid ACL")
+ setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
@@ -988,6 +992,9 @@ class ZKCheckedEphemeral(path: String,
case Code.SESSIONEXPIRED =>
error("Session has expired while reading znode %s".format(path))
setResult(Code.SESSIONEXPIRED)
+ case Code.INVALIDACL =>
+ error("Invalid ACL")
+ setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while getting znode data: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
@@ -1011,7 +1018,7 @@ class ZKCheckedEphemeral(path: String,
} else {
zkHandle.create(prefix,
new Array[Byte](0),
- ZkUtils. DefaultAcls(isSecure),
+ DefaultAcls(isSecure),
CreateMode.PERSISTENT,
new StringCallback() {
def processResult(rc : Int,
@@ -1031,6 +1038,9 @@ class ZKCheckedEphemeral(path: String,
case Code.SESSIONEXPIRED =>
error("Session has expired while creating %s".format(path))
setResult(Code.get(rc))
+ case Code.INVALIDACL =>
+ error("Invalid ACL")
+ setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
@@ -1068,7 +1078,8 @@ class ZKCheckedEphemeral(path: String,
}
val prefix = path.substring(0, index)
val suffix = path.substring(index, path.length)
- debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
+ debug(s"Path: $path, Prefix: $prefix, Suffix: $suffix")
+ info(s"Creating $path (is it secure? $isSecure)")
createRecursive(prefix, suffix)
val result = waitUntilResolved()
info("Result of znode creation is: %s".format(result))
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/resources/zk-digest-jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/zk-digest-jaas.conf b/core/src/test/resources/zk-digest-jaas.conf
new file mode 100644
index 0000000..6020cd2
--- /dev/null
+++ b/core/src/test/resources/zk-digest-jaas.conf
@@ -0,0 +1,11 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_super="adminsecret"
+ user_fpj="fpjsecret";
+};
+
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="fpj"
+ password="fpjsecret";
+};
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
new file mode 100644
index 0000000..5f3c4ce
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth
+
+import kafka.admin.ZkSecurityMigrator
+import kafka.utils.{Logging, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.junit.Assert._
+import org.junit.{After, Before, BeforeClass, Test}
+import scala.collection.JavaConverters._
+import scala.util.{Try, Success, Failure}
+
+
+class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
+ val jaasFile: String = "zk-digest-jaas.conf"
+ val authProvider: String = "zookeeper.authProvider.1"
+ @Before
+ override def setUp() {
+ val classLoader = getClass.getClassLoader
+ val filePath = classLoader.getResource(jaasFile).getPath
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, filePath)
+ System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
+ super.setUp()
+ }
+
+ @After
+ override def tearDown() {
+ super.tearDown()
+ System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+ System.clearProperty(authProvider)
+ }
+
+ /**
+ * Tests the method in JaasUtils that checks whether to use
+ * secure ACLs and authentication with ZooKeeper.
+ */
+ @Test
+ def testIsZkSecurityEnabled() {
+ assertTrue(JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+ assertFalse(JaasUtils.isZkSecurityEnabled(""))
+ try {
+ JaasUtils.isZkSecurityEnabled("no-such-file-exists.conf")
+ fail("Should have thrown an exception")
+ } catch {
+ case e: KafkaException => {
+ // Expected
+ }
+ case e: Exception => {
+ fail(e.toString)
+ }
+ }
+ }
+
+ /**
+ * Exercises the code in ZkUtils. The goal is mainly
+ * to verify that the behavior of ZkUtils is correct
+ * when isSecure is set to true.
+ */
+ @Test
+ def testZkUtils() {
+ assertTrue(zkUtils.isSecure)
+ for (path <- zkUtils.persistentZkPaths) {
+ zkUtils.makeSurePersistentPathExists(path)
+ if(!path.equals(ZkUtils.ConsumersPath)) {
+ val aclList = (zkUtils.zkConnection.getAcl(path)).getKey
+ assertTrue(aclList.size == 2)
+ for (acl: ACL <- aclList.asScala) {
+ assertTrue(isAclSecure(acl))
+ }
+ }
+ }
+ // Test that can create: createEphemeralPathExpectConflict
+ zkUtils.createEphemeralPathExpectConflict("/a", "")
+ verify("/a")
+ // Test that can create: createPersistentPath
+ zkUtils.createPersistentPath("/b")
+ verify("/b")
+ // Test that can create: createSequentialPersistentPath
+ val seqPath = zkUtils.createSequentialPersistentPath("/c", "")
+ verify(seqPath)
+ // Test that can update: updateEphemeralPath
+ zkUtils.updateEphemeralPath("/a", "updated")
+ val valueA: String = zkUtils.zkClient.readData("/a")
+ assertTrue(valueA.equals("updated"))
+ // Test that can update: updatePersistentPath
+ zkUtils.updatePersistentPath("/b", "updated")
+ val valueB: String = zkUtils.zkClient.readData("/b")
+ assertTrue(valueB.equals("updated"))
+
+ info("Leaving testZkUtils")
+ }
+
+ /**
+ * Tests the migration tool when making an unsecure
+ * cluster secure.
+ */
+ @Test
+ def testZkMigration() {
+ val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ try {
+ testMigration(unsecureZkUtils, zkUtils)
+ } finally {
+ unsecureZkUtils.close()
+ }
+ }
+
+ /**
+ * Tests the migration tool when making a secure
+ * cluster unsecure.
+ */
+ @Test
+ def testZkAntiMigration() {
+ val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ try {
+ testMigration(zkUtils, unsecureZkUtils)
+ } finally {
+ unsecureZkUtils.close()
+ }
+ }
+
+ /**
+ * Tests that the persistent paths cannot be deleted.
+ */
+ @Test
+ def testDelete() {
+ info(s"zkConnect string: $zkConnect")
+ ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", s"--zookeeper.connect=$zkConnect"))
+ deleteAllUnsecure()
+ }
+
+ /**
+ * Tests that znodes cannot be deleted when the
+ * persistent paths have children.
+ */
+ @Test
+ def testDeleteRecursive() {
+ info(s"zkConnect string: $zkConnect")
+ for (path <- zkUtils.securePersistentZkPaths) {
+ info(s"Creating $path")
+ zkUtils.makeSurePersistentPathExists(path)
+ zkUtils.createPersistentPath(s"$path/fpjwashere", "")
+ }
+ zkUtils.zkConnection.setAcl("/", zkUtils.DefaultAcls, -1)
+ deleteAllUnsecure()
+ }
+
+ /**
+ * Tests the migration tool when chroot is being used.
+ */
+ @Test
+ def testChroot {
+ zkUtils.createPersistentPath("/kafka")
+ val unsecureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, false)
+ val secureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, true)
+ try {
+ testMigration(unsecureZkUtils, secureZkUtils)
+ } finally {
+ unsecureZkUtils.close()
+ secureZkUtils.close()
+ }
+ }
+
+ /**
+ * Exercises the migration tool. It is used by two test cases:
+ * testZkMigration and testZkAntiMigration.
+ */
+ private def testMigration(firstZk: ZkUtils, secondZk: ZkUtils) {
+ info(s"zkConnect string: $zkConnect")
+ for (path <- firstZk.securePersistentZkPaths) {
+ info(s"Creating $path")
+ firstZk.makeSurePersistentPathExists(path)
+ // Create a child for each znode to exercise the recurrent
+ // traversal of the data tree
+ firstZk.createPersistentPath(s"$path/fpjwashere", "")
+ }
+ val secureOpt: String = secondZk.isSecure match {
+ case true => "secure"
+ case false => "unsecure"
+ }
+ ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkConnect"))
+ info("Done with migration")
+ for (path <- secondZk.securePersistentZkPaths) {
+ val listParent = (secondZk.zkConnection.getAcl(path)).getKey
+ assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
+
+ val childPath = path + "/fpjwashere"
+ val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey
+ assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure))
+ }
+ }
+
+ /**
+ * Verifies that the path has the appropriate secure ACL.
+ */
+ private def verify(path: String): Boolean = {
+ val list = (zkUtils.zkConnection.getAcl(path)).getKey
+ list.asScala.forall(isAclSecure)
+ }
+
+ /**
+ * Verifies ACL.
+ */
+ private def isAclCorrect(list: java.util.List[ACL], secure: Boolean): Boolean = {
+ val isListSizeCorrect = secure match {
+ case true => list.size == 2
+ case false => list.size == 1
+ }
+ isListSizeCorrect && list.asScala.forall(
+ secure match {
+ case true => isAclSecure
+ case false => isAclUnsecure
+ })
+ }
+
+ /**
+ * Verifies that this ACL is the secure one. The
+ * values are based on the constants used in the
+ * ZooKeeper code base.
+ */
+ private def isAclSecure(acl: ACL): Boolean = {
+ info(s"ACL $acl")
+ acl.getPerms match {
+ case 1 => {
+ acl.getId.getScheme.equals("world")
+ }
+ case 31 => {
+ acl.getId.getScheme.equals("sasl")
+ }
+ case _: Int => {
+ false
+ }
+ }
+ }
+
+ /**
+ * Verifies that the ACL corresponds to the unsecure one.
+ */
+ private def isAclUnsecure(acl: ACL): Boolean = {
+ info(s"ACL $acl")
+ acl.getPerms match {
+ case 31 => {
+ acl.getId.getScheme.equals("world")
+ }
+ case _: Int => {
+ false
+ }
+ }
+ }
+
+ /**
+ * Sets up and starts the recursive execution of deletes.
+ * This is used in the testDelete and testDeleteRecursive
+ * test cases.
+ */
+ private def deleteAllUnsecure() {
+ System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
+ val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ val result: Try[Boolean] = {
+ deleteRecursive(unsecureZkUtils, "/")
+ }
+ // Clean up before leaving the test case
+ unsecureZkUtils.close()
+ System.clearProperty(JaasUtils.ZK_SASL_CLIENT)
+
+ // Fail the test if able to delete
+ result match {
+ case Success(v) => // All done
+ case Failure(e) => fail(e.getMessage)
+ }
+ }
+
+ /**
+ * Tries to delete znodes recursively
+ */
+ private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
+ info(s"Deleting $path")
+ var result: Try[Boolean] = Success(true)
+ for (child <- zkUtils.getChildren(path))
+ result = (path match {
+ case "/" => deleteRecursive(zkUtils, s"/$child")
+ case path => deleteRecursive(zkUtils, s"$path/$child")
+ }) match {
+ case Success(v) => result
+ case Failure(e) => Failure(e)
+ }
+ path match {
+ // Do not try to delete the root
+ case "/" => result
+ // For all other paths, try to delete it
+ case path =>
+ try{
+ zkUtils.deletePath(path)
+ Failure(new Exception(s"Have been able to delete $path"))
+ } catch {
+ case e: Exception => result
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 2bca2cf..5fa2f65 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -21,7 +21,9 @@ import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
+import javax.security.auth.login.Configuration
import kafka.utils.CoreUtils
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index eb08f58..cfcdc35 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -17,24 +17,66 @@
package kafka.zk
+import java.util.ArrayList
+import java.util.Collection
+import javax.security.auth.login.Configuration
+
import kafka.consumer.ConsumerConfig
import kafka.utils.ZkUtils
import kafka.utils.ZKCheckedEphemeral
import kafka.utils.TestUtils
+import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher
import org.apache.zookeeper.ZooDefs.Ids
import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
-import org.junit.{Test, Assert}
+import org.junit.{After, Before, Test, Assert}
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
-class ZKEphemeralTest extends ZooKeeperTestHarness {
- var zkSessionTimeoutMs = 1000
+object ZKEphemeralTest {
+ @Parameters
+ def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = {
+ val list = new ArrayList[Array[java.lang.Boolean]]()
+ list.add(Array(true))
+ list.add(Array(false))
+ list
+ }
+}
+@RunWith(value = classOf[Parameterized])
+class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
+ val jaasFile: String = "zk-digest-jaas.conf"
+ val authProvider: String = "zookeeper.authProvider.1"
+ var zkSessionTimeoutMs = 1000
+
+ @Before
+ override def setUp() {
+ if(secure) {
+ val classLoader = getClass.getClassLoader
+ val filePath = classLoader.getResource(jaasFile).getPath
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, filePath)
+ System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
+ if(!JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) {
+ fail("Secure access not enabled")
+ }
+ }
+ super.setUp
+ }
+
+ @After
+ override def tearDown() {
+ super.tearDown
+ System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+ System.clearProperty(authProvider)
+ }
+
@Test
def testEphemeralNodeCleanup = {
val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
- var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
try {
zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
@@ -46,7 +88,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
testData = zkUtils.readData("/tmp/zktest")._1
Assert.assertNotNull(testData)
zkUtils.close
- zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(confFile))
val nodeExists = zkUtils.pathExists("/tmp/zktest")
Assert.assertFalse(nodeExists)
}
@@ -54,28 +96,21 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
/*****
***** Tests for ZkWatchedEphemeral
*****/
-
+
/**
* Tests basic creation
*/
@Test
def testZkWatchedEphemeral = {
- val path = "/zwe-test"
+ var path = "/zwe-test"
testCreation(path)
- }
-
- /**
- * Tests recursive creation
- */
- @Test
- def testZkWatchedEphemeralRecursive = {
- val path = "/zwe-test-parent/zwe-test"
+ path = "/zwe-test-parent/zwe-test"
testCreation(path)
}
-
+
private def testCreation(path: String) {
val zk = zkUtils.zkConnection.getZookeeper
- val zwe = new ZKCheckedEphemeral(path, "", zk, false)
+ val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
var created = false
var counter = 10
@@ -89,7 +124,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
zwe.create()
// Waits until the znode is created
TestUtils.waitUntilTrue(() => zkUtils.pathExists(path),
- "Znode %s wasn't created".format(path))
+ s"Znode $path wasn't created")
}
/**
@@ -104,7 +139,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
//Creates a second session
val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
val zk2 = zkConnection2.getZookeeper
- var zwe = new ZKCheckedEphemeral(path, "", zk2, false)
+ var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled(confFile))
// Creates znode for path in the first session
zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
@@ -131,7 +166,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
// Creates znode for path in the first session
zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
- var zwe = new ZKCheckedEphemeral(path, "", zk, false)
+ var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled(confFile))
//Bootstraps the ZKWatchedEphemeral object
var gotException = false;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/983b1f9e/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 38a0765..f567555 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,30 +17,73 @@
package kafka.zk
+import java.io._
+import java.net._
+import javax.security.auth.login.Configuration
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
-import kafka.utils.{ZkUtils, CoreUtils}
+import kafka.utils.{ZkUtils, Logging, CoreUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
+import org.apache.kafka.common.security.JaasUtils
-trait ZooKeeperTestHarness extends JUnitSuite {
- var zkPort: Int = -1
+object FourLetterWords {
+ def sendStat(host: String, port: Int, timeout: Int) {
+ val hostAddress = if (host != null)
+ new InetSocketAddress(host, port)
+ else
+ new InetSocketAddress(InetAddress.getByName(null), port)
+ val sock = new Socket()
+ var reader: BufferedReader = null
+ sock.connect(hostAddress, timeout)
+ try {
+ val outstream = sock.getOutputStream
+ outstream.write("stat".getBytes)
+ outstream.flush
+ } catch {
+ case e: SocketTimeoutException => {
+ throw new IOException("Exception while sending 4lw")
+ }
+ } finally {
+ sock.close
+ if (reader != null)
+ reader.close
+ }
+ }
+}
+
+trait ZooKeeperTestHarness extends JUnitSuite with Logging {
var zookeeper: EmbeddedZookeeper = null
+ var zkPort: Int = -1
var zkUtils: ZkUtils = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
def zkConnect: String = "127.0.0.1:" + zkPort
-
+ def confFile: String = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")
+
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
zkPort = zookeeper.port
- zkUtils = ZkUtils.apply(zkConnect, zkSessionTimeout, zkConnectionTimeout, false)
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "")))
}
@After
def tearDown() {
- CoreUtils.swallow(zkUtils.close())
- CoreUtils.swallow(zookeeper.shutdown())
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close())
+ if (zookeeper != null)
+ CoreUtils.swallow(zookeeper.shutdown())
+
+ var isDown = false
+ while(!isDown) {
+ try {
+ FourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
+ } catch {
+ case _: Throwable => {
+ info("Server is down")
+ isDown = true
+ }
+ }
+ }
}
-
}