You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2018/11/30 05:41:49 UTC
[kafka] branch trunk updated: KAFKA-7259;
Remove deprecated ZKUtils usage from ZkSecurityMigrator
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 944f24c KAFKA-7259; Remove deprecated ZKUtils usage from ZkSecurityMigrator
944f24c is described below
commit 944f24cfdc8edd2b4ef89ed40a55a480e8f89632
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Nov 30 11:11:25 2018 +0530
KAFKA-7259; Remove deprecated ZKUtils usage from ZkSecurityMigrator
- Remove ZKUtils usage from various tests
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Sriharsha Chintalapani <sr...@apache.org>, Ismael Juma <is...@juma.me.uk>, Satish Duggana <sa...@apache.org>, Jun Rao <ju...@gmail.com>, Ryanne Dolan <ry...@gmail.com>
Closes #5480 from omkreddy/zkutils
---
.../scala/kafka/admin/ZkSecurityMigrator.scala | 36 ++--
.../main/scala/kafka/tools/DumpLogSegments.scala | 1 -
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 85 ++++++---
.../scala/kafka/zk/ZkSecurityMigratorUtils.scala | 30 ++++
.../kafka/api/SaslPlainPlaintextConsumerTest.scala | 11 +-
.../SaslPlainSslEndToEndAuthorizationTest.scala | 7 +-
.../other/kafka/ReplicationQuotasTestRig.scala | 4 +-
.../scala/unit/kafka/admin/TopicCommandTest.scala | 7 +-
.../kafka/security/auth/ZkAuthorizationTest.scala | 199 ++++++++++++---------
.../test/scala/unit/kafka/utils/TestUtils.scala | 32 ++--
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 29 +++
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
12 files changed, 279 insertions(+), 164 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a833db4..5cab801 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,9 +17,11 @@
package kafka.admin
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging, ZkUtils}
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging}
+import kafka.zk.{KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.I0Itec.zkclient.exception.ZkException
import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -92,8 +94,9 @@ object ZkSecurityMigrator extends Logging {
val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
- val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
- val migrator = new ZkSecurityMigrator(zkUtils)
+ val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
+ Int.MaxValue, Time.SYSTEM)
+ val migrator = new ZkSecurityMigrator(zkClient)
migrator.run()
}
@@ -120,20 +123,21 @@ object ZkSecurityMigrator extends Logging {
}
}
-class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
+class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
+ private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient)
private val futures = new Queue[Future[String]]
- private def setAcl(path: String, setPromise: Promise[String]) = {
+ private def setAcl(path: String, setPromise: Promise[String]): Unit = {
info("Setting ACL for path %s".format(path))
- zkUtils.zkConnection.getZookeeper.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, setPromise)
+ zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise)
}
- private def getChildren(path: String, childrenPromise: Promise[String]) = {
+ private def getChildren(path: String, childrenPromise: Promise[String]): Unit = {
info("Getting children to set ACLs for path %s".format(path))
- zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
+ zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
}
- private def setAclIndividually(path: String) = {
+ private def setAclIndividually(path: String): Unit = {
val setPromise = Promise[String]
futures.synchronized {
futures += setPromise.future
@@ -141,7 +145,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
setAcl(path, setPromise)
}
- private def setAclsRecursively(path: String) = {
+ private def setAclsRecursively(path: String): Unit = {
val setPromise = Promise[String]
val childrenPromise = Promise[String]
futures.synchronized {
@@ -157,7 +161,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
path: String,
ctx: Object,
children: java.util.List[String]) {
- val zkHandle = zkUtils.zkConnection.getZookeeper
+ val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
case Code.OK =>
@@ -191,7 +195,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
path: String,
ctx: Object,
stat: Stat) {
- val zkHandle = zkUtils.zkConnection.getZookeeper
+ val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
@@ -199,7 +203,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
info("Successfully set ACLs for %s".format(path))
promise success "done"
case Code.CONNECTIONLOSS =>
- zkHandle.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, ctx)
+ zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, ctx)
case Code.NONODE =>
warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
promise success "done"
@@ -218,9 +222,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
private def run(): Unit = {
try {
setAclIndividually("/")
- for (path <- ZkUtils.SecureZkRootPaths) {
+ for (path <- ZkData.SecureRootPaths) {
debug("Going to set ACL for %s".format(path))
- zkUtils.makeSurePersistentPathExists(path)
+ zkClient.makeSurePersistentPathExists(path)
setAclsRecursively(path)
}
@@ -240,7 +244,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
recurse()
} finally {
- zkUtils.close
+ zkClient.close
}
}
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 4c8c4e1..281e920 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -20,7 +20,6 @@ package kafka.tools
import java.io._
import java.nio.ByteBuffer
-import joptsimple.OptionParser
import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 4ad40ef..732a827 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -81,8 +81,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
* @param data the znode data
* @return the created path (including the appended monotonically increasing number)
*/
- private[zk] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
- val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ private[kafka] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
+ val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow
createResponse.name
@@ -137,7 +137,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
try {
val transaction = zooKeeperClient.createTransaction()
transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp),
- acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
+ defaultAcls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)
val results = transaction.commit()
val setDataResult = results.get(1).asInstanceOf[SetDataResult]
@@ -214,7 +214,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
val path = TopicPartitionStateZNode.path(partition)
val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
- CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+ CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests.toSeq)
}
@@ -237,7 +237,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
*/
def createControllerEpochRaw(epoch: Int): CreateResponse = {
val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
- acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
+ defaultAcls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
retryRequestUntilConnected(createRequest)
}
@@ -384,7 +384,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
val path = ConfigEntityChangeNotificationSequenceZNode.createPath
- val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow()
}
@@ -803,7 +803,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
def create(reassignmentData: Array[Byte]): CreateResponse = {
- val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path),
+ val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, defaultAcls(ReassignPartitionsZNode.path),
CreateMode.PERSISTENT, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
retryRequestUntilConnected(createRequest)
}
@@ -832,9 +832,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
* @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
*/
def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = {
- val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion,
- zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
- retryRequestUntilConnected(deleteRequest)
+ deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion)
}
/**
@@ -1114,7 +1112,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]): (Boolean, Int) = {
def create(aclData: Array[Byte]): CreateResponse = {
val path = ResourceZNode.path(resource)
- val createRequest = CreateRequest(path, aclData, acls(path), CreateMode.PERSISTENT)
+ val createRequest = CreateRequest(path, aclData, defaultAcls(path), CreateMode.PERSISTENT)
retryRequestUntilConnected(createRequest)
}
@@ -1134,7 +1132,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
*/
def createAclChangeNotification(resource: Resource): Unit = {
val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource)
- val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createRequest = CreateRequest(aclChange.path, aclChange.bytes, defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.maybeThrow
}
@@ -1241,11 +1239,21 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
/**
* Deletes the zk node recursively
- * @param path
- * @return return true if it succeeds, false otherwise
+ * @param path path to delete
+ * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
+ * @param recursiveDelete enable recursive delete
+ * @return KeeperException if there is an error while deleting the path
*/
- def deletePath(path: String): Boolean = {
- deleteRecursive(path)
+ def deletePath(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion, recursiveDelete: Boolean = true): Unit = {
+ if (recursiveDelete)
+ deleteRecursive(path, expectedControllerEpochZkVersion)
+ else {
+ val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
+ val deleteResponse = retryRequestUntilConnected(deleteRequest)
+ if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+ throw deleteResponse.resultException.get
+ }
+ }
}
/**
@@ -1262,7 +1270,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
*/
def createTokenChangeNotification(tokenId: String): Unit = {
val path = DelegationTokenChangeNotificationSequenceZNode.createPath
- val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultException.foreach(e => throw e)
}
@@ -1283,7 +1291,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
def create(tokenData: Array[Byte]): CreateResponse = {
val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId())
- val createRequest = CreateRequest(path, tokenData, acls(path), CreateMode.PERSISTENT)
+ val createRequest = CreateRequest(path, tokenData, defaultAcls(path), CreateMode.PERSISTENT)
retryRequestUntilConnected(createRequest)
}
@@ -1440,6 +1448,31 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
/**
+ * Return the ACLs of the node of the given path
+ * @param path the given path for the node
+ * @return the ACL array of the given node.
+ */
+ def getAcl(path: String): Seq[ACL] = {
+ val getAclRequest = GetAclRequest(path)
+ val getAclResponse = retryRequestUntilConnected(getAclRequest)
+ getAclResponse.resultCode match {
+ case Code.OK => getAclResponse.acl
+ case _ => throw getAclResponse.resultException.get
+ }
+ }
+
+ /**
+ * sets the ACLs to the node of the given path
+ * @param path the given path for the node
+ * @param acl the given acl for the node
+ */
+ def setAcl(path: String, acl: Seq[ACL]): Unit = {
+ val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion)
+ val setAclResponse = retryRequestUntilConnected(setAclRequest)
+ setAclResponse.maybeThrow
+ }
+
+ /**
* Create the cluster Id. If the cluster id already exists, return the current cluster id.
* @return cluster id
*/
@@ -1529,7 +1562,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
- private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+ private[kafka] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
def parentPath(path: String): String = {
val indexOfLastSlash = path.lastIndexOf("/")
@@ -1538,7 +1571,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
def createRecursive0(path: String): Unit = {
- val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+ val createRequest = CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (createResponse.resultCode == Code.NONODE) {
createRecursive0(parentPath(path))
@@ -1551,7 +1584,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
}
}
- val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+ val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT)
var createResponse = retryRequestUntilConnected(createRequest)
if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
@@ -1569,7 +1602,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
private def createTopicPartition(partitions: Seq[TopicPartition], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
val createRequests = partitions.map { partition =>
val path = TopicPartitionZNode.path(partition)
- CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+ CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests)
}
@@ -1577,7 +1610,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
val createRequests = topics.map { topic =>
val path = TopicPartitionsZNode.path(topic)
- CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+ CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
}
retryRequestsUntilConnected(createRequests)
}
@@ -1589,7 +1622,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
retryRequestsUntilConnected(getDataRequests)
}
- private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+ def defaultAcls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+
+ def secure: Boolean = isSecure
private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
retryRequestsUntilConnected(Seq(request)).head
@@ -1655,7 +1690,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
def create(): Code = {
- val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
+ val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.EPHEMERAL)
val createResponse = retryRequestUntilConnected(createRequest)
val createResultCode = createResponse.resultCode match {
case code@ Code.OK =>
diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
new file mode 100644
index 0000000..31a7ba2
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import org.apache.zookeeper.ZooKeeper
+
+/**
+ * This class should only be used in ZkSecurityMigrator tool.
+ * This class will be removed after we migrate ZkSecurityMigrator away from ZK's asynchronous API.
+ * @param kafkaZkClient
+ */
+class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) {
+
+ def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper
+
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 5789d1a..c15a508 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -16,9 +16,8 @@ import java.io.File
import java.util.Locale
import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
+import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before, Test}
@@ -29,6 +28,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+ // disable secure acls of zkClient in ZooKeeperTestHarness
+ override protected def zkAclsEnabled = Some(false)
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
@@ -36,7 +37,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
@Before
override def setUp(): Unit = {
- startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+ startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName))
super.setUp()
}
@@ -52,8 +53,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
*/
@Test
def testZkAclsDisabled() {
- val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
- TestUtils.verifyUnsecureZkAcls(zkUtils)
- CoreUtils.swallow(zkUtils.close(), this)
+ TestUtils.verifyUnsecureZkAcls(zkClient)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index efb8c48..bbe0dd8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -22,12 +22,11 @@ import javax.security.auth.Subject
import javax.security.auth.login.AppConfigurationEntry
import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils}
import kafka.utils.JaasTestUtils._
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
import org.junit.Test
@@ -134,8 +133,6 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
*/
@Test
def testAcls() {
- val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
- TestUtils.verifySecureZkAcls(zkUtils, 1)
- CoreUtils.swallow(zkUtils.close(), this)
+ TestUtils.verifySecureZkAcls(zkClient, 1)
}
}
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 9c6ae0b..20c28e7 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -19,16 +19,16 @@ package kafka
import java.io.{File, PrintWriter}
import java.nio.file.{Files, StandardOpenOption}
-
import javax.imageio.ImageIO
+
import kafka.admin.ReassignPartitionsCommand
import kafka.admin.ReassignPartitionsCommand.Throttle
-import org.apache.kafka.common.TopicPartition
import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
import kafka.utils.TestUtils._
import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
import org.jfree.chart.plot.PlotOrientation
import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart}
import org.jfree.data.xy.{XYSeries, XYSeriesCollection}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5d2d873..5cfab90 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -20,10 +20,9 @@ import org.junit.Assert._
import org.junit.Test
import kafka.utils.Logging
import kafka.utils.TestUtils
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, ZooKeeperTestHarness}
import kafka.server.ConfigType
import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils.getDeleteTopicPath
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.config.ConfigException
@@ -80,7 +79,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
- val deletePath = getDeleteTopicPath(normalTopic)
+ val deletePath = DeleteTopicsTopicZNode.path(normalTopic)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath))
TopicCommand.deleteTopic(zkClient, deleteOpts)
assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath))
@@ -93,7 +92,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
- val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
+ val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 1cdbe4b..de5ae22 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -17,23 +17,32 @@
package kafka.security.auth
+import java.nio.charset.StandardCharsets
+
import kafka.admin.ZkSecurityMigrator
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
-import org.apache.kafka.common.KafkaException
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk._
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.{ACL, Stat}
import org.junit.Assert._
import org.junit.{After, Before, Test}
-import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import javax.security.auth.login.Configuration
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
val authProvider = "zookeeper.authProvider.1"
- var zkUtils: ZkUtils = null
@Before
override def setUp() {
@@ -41,13 +50,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
Configuration.setConfiguration(null)
System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
super.setUp()
- zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
}
@After
override def tearDown() {
- if (zkUtils != null)
- CoreUtils.swallow(zkUtils.close(), this)
super.tearDown()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.clearProperty(authProvider)
@@ -59,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* secure ACLs and authentication with ZooKeeper.
*/
@Test
- def testIsZkSecurityEnabled() {
+ def testIsZkSecurityEnabled(): Unit = {
assertTrue(JaasUtils.isZkSecurityEnabled())
Configuration.setConfiguration(null)
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
@@ -75,59 +81,76 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
}
/**
- * Exercises the code in ZkUtils. The goal is mainly
- * to verify that the behavior of ZkUtils is correct
+ * Exercises the code in KafkaZkClient. The goal is mainly
+ * to verify that the behavior of KafkaZkClient is correct
* when isSecure is set to true.
*/
@Test
- def testZkUtils() {
- assertTrue(zkUtils.isSecure)
- for (path <- zkUtils.persistentZkPaths) {
- zkUtils.makeSurePersistentPathExists(path)
- if (ZkUtils.sensitivePath(path)) {
- val aclList = zkUtils.zkConnection.getAcl(path).getKey
+ def testKafkaZkClient(): Unit = {
+ assertTrue(zkClient.secure)
+ for (path <- ZkData.PersistentZkPaths) {
+ zkClient.makeSurePersistentPathExists(path)
+ if (ZkData.sensitivePath(path)) {
+ val aclList = zkClient.getAcl(path)
assertEquals(s"Unexpected acl list size for $path", 1, aclList.size)
- for (acl <- aclList.asScala)
+ for (acl <- aclList)
assertTrue(TestUtils.isAclSecure(acl, sensitive = true))
- } else if (!path.equals(ZkUtils.ConsumersPath)) {
- val aclList = zkUtils.zkConnection.getAcl(path).getKey
+ } else if (!path.equals(ConsumerPathZNode.path)) {
+ val aclList = zkClient.getAcl(path)
assertEquals(s"Unexpected acl list size for $path", 2, aclList.size)
- for (acl <- aclList.asScala)
+ for (acl <- aclList)
assertTrue(TestUtils.isAclSecure(acl, sensitive = false))
}
}
- // Test that can create: createEphemeralPathExpectConflict
- zkUtils.createEphemeralPathExpectConflict("/a", "")
- verify("/a")
- // Test that can create: createPersistentPath
- zkUtils.createPersistentPath("/b")
- verify("/b")
+
+ // Test that creates Ephemeral node
+ val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ zkClient.registerBroker(brokerInfo)
+ verify(brokerInfo.path)
+
+ // Test that creates persistent nodes
+ val topic1 = "topic1"
+ val assignment = Map(
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
+ new TopicPartition(topic1, 1) -> Seq(0, 1),
+ new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
+ )
+
+ // create a topic assignment
+ zkClient.createTopicAssignment(topic1, assignment)
+ verify(TopicZNode.path(topic1))
+
// Test that can create: createSequentialPersistentPath
- val seqPath = zkUtils.createSequentialPersistentPath("/c", "")
+ val seqPath = zkClient.createSequentialPersistentPath("/c", "".getBytes(StandardCharsets.UTF_8))
verify(seqPath)
- // Test that can update: updateEphemeralPath
- zkUtils.updateEphemeralPath("/a", "updated")
- val valueA: String = zkUtils.zkClient.readData("/a")
- assertTrue(valueA.equals("updated"))
- // Test that can update: updatePersistentPath
- zkUtils.updatePersistentPath("/b", "updated")
- val valueB: String = zkUtils.zkClient.readData("/b")
- assertTrue(valueB.equals("updated"))
- info("Leaving testZkUtils")
+ // Test that can update Ephemeral node
+ val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ zkClient.updateBrokerInfo(updatedBrokerInfo)
+ assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+ // Test that can update persistent nodes
+ val updatedAssignment = assignment - new TopicPartition(topic1, 2)
+ zkClient.setTopicAssignment(topic1, updatedAssignment)
+ assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
}
+ private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+ rack: Option[String] = None): BrokerInfo =
+ BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+ (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
/**
* Tests the migration tool when making an unsecure
* cluster secure.
*/
@Test
- def testZkMigration() {
- val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ def testZkMigration(): Unit = {
+ val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
try {
- testMigration(zkConnect, unsecureZkUtils, zkUtils)
+ testMigration(zkConnect, unsecureZkClient, zkClient)
} finally {
- unsecureZkUtils.close()
+ unsecureZkClient.close()
}
}
@@ -136,12 +159,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* cluster unsecure.
*/
@Test
- def testZkAntiMigration() {
- val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ def testZkAntiMigration(): Unit = {
+ val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
try {
- testMigration(zkConnect, zkUtils, unsecureZkUtils)
+ testMigration(zkConnect, zkClient, unsecureZkClient)
} finally {
- unsecureZkUtils.close()
+ unsecureZkClient.close()
}
}
@@ -149,42 +172,42 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* Tests that the persistent paths cannot be deleted.
*/
@Test
- def testDelete() {
+ def testDelete(): Unit = {
info(s"zkConnect string: $zkConnect")
ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", s"--zookeeper.connect=$zkConnect"))
deleteAllUnsecure()
}
/**
- * Tests that znodes cannot be deleted when the
+ * Tests that znodes cannot be deleted when the
* persistent paths have children.
*/
@Test
- def testDeleteRecursive() {
+ def testDeleteRecursive(): Unit = {
info(s"zkConnect string: $zkConnect")
- for (path <- ZkUtils.SecureZkRootPaths) {
+ for (path <- ZkData.SecureRootPaths) {
info(s"Creating $path")
- zkUtils.makeSurePersistentPathExists(path)
- zkUtils.createPersistentPath(s"$path/fpjwashere", "")
+ zkClient.makeSurePersistentPathExists(path)
+ zkClient.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8))
}
- zkUtils.zkConnection.setAcl("/", zkUtils.defaultAcls("/"), -1)
+ zkClient.setAcl("/", zkClient.defaultAcls("/"))
deleteAllUnsecure()
}
-
+
/**
* Tests the migration tool when chroot is being used.
*/
@Test
def testChroot(): Unit = {
val zkUrl = zkConnect + "/kafka"
- zkUtils.createPersistentPath("/kafka")
- val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)
- val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true)
+ zkClient.createRecursive("/kafka")
+ val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+ val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM)
try {
- testMigration(zkUrl, unsecureZkUtils, secureZkUtils)
+ testMigration(zkUrl, unsecureZkClient, secureZkClient)
} finally {
- unsecureZkUtils.close()
- secureZkUtils.close()
+ unsecureZkClient.close()
+ secureZkClient.close()
}
}
@@ -192,62 +215,62 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* Exercises the migration tool. It is used in these test cases:
* testZkMigration, testZkAntiMigration, testChroot.
*/
- private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
+ private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = {
info(s"zkConnect string: $zkUrl")
- for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
+ for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
info(s"Creating $path")
firstZk.makeSurePersistentPathExists(path)
// Create a child for each znode to exercise the recurrent
// traversal of the data tree
- firstZk.createPersistentPath(s"$path/fpjwashere", "")
+ firstZk.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8))
}
// Getting security option to determine how to verify ACLs.
// Additionally, we create the consumers znode (not in
// securePersistentZkPaths) to make sure that we don't
// add ACLs to it.
val secureOpt: String =
- if (secondZk.isSecure) {
- firstZk.createPersistentPath(ZkUtils.ConsumersPath)
+ if (secondZk.secure) {
+ firstZk.createRecursive(ConsumerPathZNode.path)
"secure"
} else {
- secondZk.createPersistentPath(ZkUtils.ConsumersPath)
+ secondZk.createRecursive(ConsumerPathZNode.path)
"unsecure"
}
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
- for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
- val sensitive = ZkUtils.sensitivePath(path)
- val listParent = secondZk.zkConnection.getAcl(path).getKey
- assertTrue(path, isAclCorrect(listParent, secondZk.isSecure, sensitive))
+ for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+ val sensitive = ZkData.sensitivePath(path)
+ val listParent = secondZk.getAcl(path)
+ assertTrue(path, isAclCorrect(listParent, secondZk.secure, sensitive))
val childPath = path + "/fpjwashere"
- val listChild = secondZk.zkConnection.getAcl(childPath).getKey
- assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure, sensitive))
+ val listChild = secondZk.getAcl(childPath)
+ assertTrue(childPath, isAclCorrect(listChild, secondZk.secure, sensitive))
}
// Check consumers path.
- val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
- assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false, false))
+ val consumersAcl = firstZk.getAcl(ConsumerPathZNode.path)
+ assertTrue(ConsumerPathZNode.path, isAclCorrect(consumersAcl, false, false))
}
/**
* Verifies that the path has the appropriate secure ACL.
*/
- private def verify(path: String): Boolean = {
- val sensitive = ZkUtils.sensitivePath(path)
- val list = zkUtils.zkConnection.getAcl(path).getKey
- list.asScala.forall(TestUtils.isAclSecure(_, sensitive))
+ private def verify(path: String): Unit = {
+ val sensitive = ZkData.sensitivePath(path)
+ val list = zkClient.getAcl(path)
+ assertTrue(list.forall(TestUtils.isAclSecure(_, sensitive)))
}
/**
* Verifies ACL.
*/
- private def isAclCorrect(list: java.util.List[ACL], secure: Boolean, sensitive: Boolean): Boolean = {
+ private def isAclCorrect(list: Seq[ACL], secure: Boolean, sensitive: Boolean): Boolean = {
val isListSizeCorrect =
if (secure && !sensitive)
list.size == 2
else
list.size == 1
- isListSizeCorrect && list.asScala.forall(
+ isListSizeCorrect && list.forall(
if (secure)
TestUtils.isAclSecure(_, sensitive)
else
@@ -260,14 +283,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
* This is used in the testDelete and testDeleteRecursive
* test cases.
*/
- private def deleteAllUnsecure() {
+ private def deleteAllUnsecure(): Unit = {
System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
- val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+ val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
val result: Try[Boolean] = {
- deleteRecursive(unsecureZkUtils, "/")
+ deleteRecursive(unsecureZkClient, "/")
}
// Clean up before leaving the test case
- unsecureZkUtils.close()
+ unsecureZkClient.close()
System.clearProperty(JaasUtils.ZK_SASL_CLIENT)
// Fail the test if able to delete
@@ -280,13 +303,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
/**
* Tries to delete znodes recursively
*/
- private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
+ private def deleteRecursive(zkClient: KafkaZkClient, path: String): Try[Boolean] = {
info(s"Deleting $path")
var result: Try[Boolean] = Success(true)
- for (child <- zkUtils.getChildren(path))
+ for (child <- zkClient.getChildren(path))
result = (path match {
- case "/" => deleteRecursive(zkUtils, s"/$child")
- case path => deleteRecursive(zkUtils, s"$path/$child")
+ case "/" => deleteRecursive(zkClient, s"/$child")
+ case path => deleteRecursive(zkClient, s"$path/$child")
}) match {
case Success(_) => result
case Failure(e) => Failure(e)
@@ -297,7 +320,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
// For all other paths, try to delete it
case path =>
try {
- zkUtils.deletePath(path)
+ zkClient.deletePath(path, recursiveDelete = false)
Failure(new Exception(s"Have been able to delete $path"))
} catch {
case _: Exception => result
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c3e7312..b5a7583 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.{Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
import javax.net.ssl.X509TrustManager
+
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.log._
@@ -1132,30 +1132,30 @@ object TestUtils extends Logging {
}
}
- private def secureZkPaths(zkUtils: ZkUtils): Seq[String] = {
+ private def secureZkPaths(zkClient: KafkaZkClient): Seq[String] = {
def subPaths(path: String): Seq[String] = {
- if (zkUtils.pathExists(path))
- path +: zkUtils.getChildren(path).map(c => path + "/" + c).flatMap(subPaths)
+ if (zkClient.pathExists(path))
+ path +: zkClient.getChildren(path).map(c => path + "/" + c).flatMap(subPaths)
else
Seq.empty
}
- val topLevelPaths = ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths
+ val topLevelPaths = ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths
topLevelPaths.flatMap(subPaths)
}
/**
* Verifies that all secure paths in ZK are created with the expected ACL.
*/
- def verifySecureZkAcls(zkUtils: ZkUtils, usersWithAccess: Int) {
- secureZkPaths(zkUtils).foreach(path => {
- if (zkUtils.pathExists(path)) {
- val sensitive = ZkUtils.sensitivePath(path)
+ def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: Int) {
+ secureZkPaths(zkClient).foreach(path => {
+ if (zkClient.pathExists(path)) {
+ val sensitive = ZkData.sensitivePath(path)
// usersWithAccess have ALL access to path. For paths that are
// not sensitive, world has READ access.
val aclCount = if (sensitive) usersWithAccess else usersWithAccess + 1
- val acls = zkUtils.zkConnection.getAcl(path).getKey
+ val acls = zkClient.getAcl(path)
assertEquals(s"Invalid ACLs for $path $acls", aclCount, acls.size)
- acls.asScala.foreach(acl => isAclSecure(acl, sensitive))
+ acls.foreach(acl => isAclSecure(acl, sensitive))
}
})
}
@@ -1164,12 +1164,12 @@ object TestUtils extends Logging {
* Verifies that secure paths in ZK have no access control. This is
* the case when zookeeper.set.acl=false and no ACLs have been configured.
*/
- def verifyUnsecureZkAcls(zkUtils: ZkUtils) {
- secureZkPaths(zkUtils).foreach(path => {
- if (zkUtils.pathExists(path)) {
- val acls = zkUtils.zkConnection.getAcl(path).getKey
+ def verifyUnsecureZkAcls(zkClient: KafkaZkClient) {
+ secureZkPaths(zkClient).foreach(path => {
+ if (zkClient.pathExists(path)) {
+ val acls = zkClient.getAcl(path)
assertEquals(s"Invalid ACLs for $path $acls", 1, acls.size)
- acls.asScala.foreach(isAclUnsecure)
+ acls.foreach(isAclUnsecure)
}
})
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 6de9159..a8df342 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -44,6 +44,7 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.data.Stat
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -543,6 +544,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createRecursive(path)
zkClient.deletePath(path)
assertFalse(zkClient.pathExists(path))
+
+ zkClient.createRecursive(path)
+ zkClient.deletePath("/a")
+ assertFalse(zkClient.pathExists(path))
+
+ zkClient.createRecursive(path)
+ zkClient.deletePath(path, recursiveDelete = false)
+ assertFalse(zkClient.pathExists(path))
+ assertTrue(zkClient.pathExists("/a/b"))
}
@Test
@@ -1143,6 +1153,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(expectedConsumerGroupOffsetsPath, actualConsumerGroupOffsetsPath)
}
+ @Test
+ def testAclMethods(): Unit = {
+ val mockPath = "/foo"
+
+ intercept[NoNodeException] {
+ zkClient.getAcl(mockPath)
+ }
+
+ intercept[NoNodeException] {
+ zkClient.setAcl(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
+ }
+
+ zkClient.createRecursive(mockPath)
+
+ zkClient.setAcl(mockPath, ZooDefs.Ids.READ_ACL_UNSAFE.asScala)
+
+ assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
+ }
+
class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
extends KafkaZkClient(zooKeeperClient, isSecure, time) {
// Overwriting this method from the parent class to force the client to re-register the Broker.
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 8d34c48..2f75fa2 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -43,7 +43,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
val zkSessionTimeout = 6000
val zkMaxInFlightRequests = Int.MaxValue
- protected val zkAclsEnabled: Option[Boolean] = None
+ protected def zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null