You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/20 08:46:17 UTC
kafka git commit: KAFKA-3328: SimpleAclAuthorizer can lose ACLs with frequent add/remov…
Repository: kafka
Updated Branches:
refs/heads/trunk eb823281a -> bfac36ad0
KAFKA-3328: SimpleAclAuthorizer can lose ACLs with frequent add/remov…
…e calls
Changes the SimpleAclAuthorizer to:
- Track and utilize the zookeeper version when updating zookeeper to prevent data loss in the case of stale reads and race conditions
- Update local cache when modifying ACLs
- Add debug logging
Author: Grant Henke <gr...@gmail.com>
Author: Grant Henke <gr...@users.noreply.github.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Flavio Junqueira, Jun Rao, Ismael Juma, Gwen Shapira
Closes #1006 from granthenke/simple-authorizer-fix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bfac36ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bfac36ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bfac36ad
Branch: refs/heads/trunk
Commit: bfac36ad0e378b5f39e3889e40a75c5c1fc48fa7
Parents: eb82328
Author: Grant Henke <gr...@gmail.com>
Authored: Sun Mar 20 00:46:12 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sun Mar 20 00:46:12 2016 -0700
----------------------------------------------------------------------
.../security/auth/SimpleAclAuthorizer.scala | 219 +++++++++++++------
core/src/main/scala/kafka/utils/ZkUtils.scala | 39 ++--
.../security/auth/SimpleAclAuthorizerTest.scala | 86 +++++++-
.../test/scala/unit/kafka/utils/TestUtils.scala | 60 ++++-
4 files changed, 318 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bfac36ad/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 77e23f8..1a06af2 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -19,19 +19,20 @@ package kafka.security.auth
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import org.apache.zookeeper.Watcher.Event.KeeperState
-
import kafka.network.RequestChannel.Session
+import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
-import org.I0Itec.zkclient.IZkStateListener
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
+import scala.util.Random
+
object SimpleAclAuthorizer {
//optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
//same zookeeper where all other kafka broker info is stored.
@@ -62,6 +63,8 @@ object SimpleAclAuthorizer {
//prefix of all the change notification sequence node.
val AclChangedPrefix = "acl_changes_"
+
+ private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
}
class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -71,9 +74,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private var zkUtils: ZkUtils = null
private var aclChangeListener: ZkNodeChangeNotificationListener = null
- private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
+ private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
private val lock = new ReentrantReadWriteLock()
+ // The maximum number of times we should try to update the resource acls in zookeeper before failing;
+ // This should never occur, but is a safeguard just in case.
+ private val maxUpdateRetries = 10
+
+ private val retryBackoffMs = 100
+ private val retryBackoffJitterMs = 50
+
/**
* Guaranteed to be called before any authorize call is made.
*/
@@ -164,67 +174,51 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
override def addAcls(acls: Set[Acl], resource: Resource) {
if (acls != null && acls.nonEmpty) {
- val updatedAcls = getAcls(resource) ++ acls
- val path = toResourcePath(resource)
-
- if (zkUtils.pathExists(path))
- zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
- else
- zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
-
- updateAclChangedFlag(resource)
+ inWriteLock(lock) {
+ updateResourceAcls(resource) { currentAcls =>
+ currentAcls ++ acls
+ }
+ }
}
}
override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
- if (zkUtils.pathExists(toResourcePath(resource))) {
- val existingAcls = getAcls(resource)
- val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))
-
- val aclNeedsRemoval = (existingAcls != filteredAcls)
- if (aclNeedsRemoval) {
- val path: String = toResourcePath(resource)
- if (filteredAcls.nonEmpty)
- zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
- else
- zkUtils.deletePath(toResourcePath(resource))
-
- updateAclChangedFlag(resource)
+ inWriteLock(lock) {
+ updateResourceAcls(resource) { currentAcls =>
+ currentAcls -- aclsTobeRemoved
}
-
- aclNeedsRemoval
- } else false
+ }
}
override def removeAcls(resource: Resource): Boolean = {
- if (zkUtils.pathExists(toResourcePath(resource))) {
- zkUtils.deletePath(toResourcePath(resource))
+ inWriteLock(lock) {
+ val result = zkUtils.deletePath(toResourcePath(resource))
+ updateCache(resource, VersionedAcls(Set(), 0))
updateAclChangedFlag(resource)
- true
- } else false
+ result
+ }
}
override def getAcls(resource: Resource): Set[Acl] = {
inReadLock(lock) {
- aclCache.get(resource).getOrElse(Set.empty[Acl])
+ aclCache.get(resource).map(_.acls).getOrElse(Set.empty[Acl])
}
}
- private def getAclsFromZk(resource: Resource): Set[Acl] = {
- val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1
- aclJson.map(Acl.fromJson).getOrElse(Set.empty)
- }
-
override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
- aclCache.mapValues { acls =>
- acls.filter(_.principal == principal)
- }.filter { case (_, acls) =>
- acls.nonEmpty
- }.toMap
+ inReadLock(lock) {
+ aclCache.mapValues { versionedAcls =>
+ versionedAcls.acls.filter(_.principal == principal)
+ }.filter { case (_, acls) =>
+ acls.nonEmpty
+ }.toMap
+ }
}
override def getAcls(): Map[Resource, Set[Acl]] = {
- aclCache.toMap
+ inReadLock(lock) {
+ aclCache.mapValues(_.acls).toMap
+ }
}
def close() {
@@ -233,25 +227,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
private def loadCache() {
- var acls = Set.empty[Acl]
- val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
- for (rType <- resourceTypes) {
- val resourceType = ResourceType.fromString(rType)
- val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
- val resourceNames = zkUtils.getChildren(resourceTypePath)
- for (resourceName <- resourceNames) {
- acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
- updateCache(new Resource(resourceType, resourceName), acls)
- }
- }
- }
-
- private def updateCache(resource: Resource, acls: Set[Acl]) {
inWriteLock(lock) {
- if (acls.nonEmpty)
- aclCache.put(resource, acls)
- else
- aclCache.remove(resource)
+ val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
+ for (rType <- resourceTypes) {
+ val resourceType = ResourceType.fromString(rType)
+ val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
+ val resourceNames = zkUtils.getChildren(resourceTypePath)
+ for (resourceName <- resourceNames) {
+ val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
+ updateCache(new Resource(resourceType, resourceName), versionedAcls)
+ }
+ }
}
}
@@ -264,16 +250,117 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
authorizerLogger.debug(s"Principal = $principal is $permissionType Operation = $operation from host = $host on resource = $resource")
}
+ /**
+ * Safely updates the resources ACLs by ensuring reads and writes respect the expected zookeeper version.
+ * Continues to retry until it succesfully updates zookeeper.
+ *
+ * Returns a boolean indicating if the content of the ACLs was actually changed.
+ *
+ * @param resource the resource to change ACLs for
+ * @param getNewAcls function to transform existing acls to new ACLs
+ * @return boolean indicating if a change was made
+ */
+ private def updateResourceAcls(resource: Resource)(getNewAcls: Set[Acl] => Set[Acl]): Boolean = {
+ val path = toResourcePath(resource)
+
+ var currentVersionedAcls =
+ if (aclCache.contains(resource))
+ getAclsFromCache(resource)
+ else
+ getAclsFromZk(resource)
+ var newVersionedAcls: VersionedAcls = null
+ var writeComplete = false
+ var retries = 0
+ while (!writeComplete && retries <= maxUpdateRetries) {
+ val newAcls = getNewAcls(currentVersionedAcls.acls)
+ val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
+ val (updateSucceeded, updateVersion) =
+ if (!newAcls.isEmpty) {
+ updatePath(path, data, currentVersionedAcls.zkVersion)
+ } else {
+ trace(s"Deleting path for $resource because it had no ACLs remaining")
+ (zkUtils.conditionalDeletePath(path, currentVersionedAcls.zkVersion), 0)
+ }
+
+ if (!updateSucceeded) {
+ trace(s"Failed to update ACLs for $resource. Used version ${currentVersionedAcls.zkVersion}. Reading data and retrying update.")
+ Thread.sleep(backoffTime)
+ currentVersionedAcls = getAclsFromZk(resource);
+ retries += 1
+ } else {
+ newVersionedAcls = VersionedAcls(newAcls, updateVersion)
+ writeComplete = updateSucceeded
+ }
+ }
+
+ if(!writeComplete)
+ throw new IllegalStateException(s"Failed to update ACLs for $resource after trying a maximum of $maxUpdateRetries times")
+
+ if (newVersionedAcls.acls != currentVersionedAcls.acls) {
+ debug(s"Updated ACLs for $resource to ${newVersionedAcls.acls} with version ${newVersionedAcls.zkVersion}")
+ updateCache(resource, newVersionedAcls)
+ updateAclChangedFlag(resource)
+ true
+ } else {
+ debug(s"Updated ACLs for $resource, no change was made")
+ updateCache(resource, newVersionedAcls) // Even if no change, update the version
+ false
+ }
+ }
+
+ /**
+ * Updates a zookeeper path with an expected version. If the topic does not exist, it will create it.
+ * Returns if the update was successful and the new version.
+ */
+ private def updatePath(path: String, data: String, expectedVersion: Int): (Boolean, Int) = {
+ try {
+ zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
+ } catch {
+ case e: ZkNoNodeException =>
+ try {
+ debug(s"Node $path does not exist, attempting to create it.")
+ zkUtils.createPersistentPath(path, data)
+ (true, 0)
+ } catch {
+ case e: ZkNodeExistsException =>
+ debug(s"Failed to create node for $path because it already exists.")
+ (false, 0)
+ }
+ }
+ }
+
+ private def getAclsFromCache(resource: Resource): VersionedAcls = {
+ aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
+ }
+
+ private def getAclsFromZk(resource: Resource): VersionedAcls = {
+ val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
+ VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
+ }
+
+ private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
+ if (versionedAcls.acls.nonEmpty) {
+ aclCache.put(resource, versionedAcls)
+ } else {
+ aclCache.remove(resource)
+ }
+ }
+
private def updateAclChangedFlag(resource: Resource) {
zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
}
- object AclChangedNotificationHandler extends NotificationHandler {
+ private def backoffTime = {
+ retryBackoffMs + Random.nextInt(retryBackoffJitterMs)
+ }
+ object AclChangedNotificationHandler extends NotificationHandler {
override def processNotification(notificationMessage: String) {
val resource: Resource = Resource.fromString(notificationMessage)
- val acls = getAclsFromZk(resource)
- updateCache(resource, acls)
+ inWriteLock(lock) {
+ val versionedAcls = getAclsFromZk(resource)
+ updateCache(resource, versionedAcls)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bfac36ad/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 99c8196..49d3cfa 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -52,12 +52,12 @@ object ZkUtils {
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
-
+
def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
}
-
+
/*
* Used in tests
*/
@@ -75,7 +75,7 @@ object ZkUtils {
val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
(zkClient, zkConnection)
}
-
+
def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
val list = new java.util.ArrayList[ACL]
list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
@@ -84,7 +84,7 @@ object ZkUtils {
} else {
ZooDefs.Ids.OPEN_ACL_UNSAFE
}
-
+
def maybeDeletePath(zkUrl: String, dir: String) {
try {
val zk = createZkClient(zkUrl, 30*1000, 30*1000)
@@ -94,7 +94,7 @@ object ZkUtils {
case _: Throwable => // swallow
}
}
-
+
/*
* Get calls that only depend on static paths
*/
@@ -111,7 +111,7 @@ object ZkUtils {
def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
getTopicPartitionPath(topic, partitionId) + "/" + "state"
-
+
def getEntityConfigRootPath(entityType: String): String =
ZkUtils.EntityConfigPath + "/" + entityType
@@ -122,7 +122,7 @@ object ZkUtils {
DeleteTopicsPath + "/" + topic
}
-class ZkUtils(val zkClient: ZkClient,
+class ZkUtils(val zkClient: ZkClient,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
// These are persistent ZK paths that should exist on kafka broker startup.
@@ -146,7 +146,7 @@ class ZkUtils(val zkClient: ZkClient,
IsrChangeNotificationPath)
val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
-
+
def getController(): Int = {
readDataMaybeNull(ControllerPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
@@ -512,6 +512,19 @@ class ZkUtils(val zkClient: ZkClient,
}
}
+ /**
+ * Conditional delete the persistent path data, return true if it succeeds,
+ * otherwise (the current version is not the expected version)
+ */
+ def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
+ try {
+ zkClient.delete(path, expectedVersion)
+ true
+ } catch {
+ case e: KeeperException.BadVersionException => false
+ }
+ }
+
def deletePathRecursive(path: String) {
try {
zkClient.deleteRecursive(path)
@@ -847,7 +860,7 @@ class ZkUtils(val zkClient: ZkClient,
}
}
}
-
+
def close() {
if(zkClient != null) {
zkClient.close()
@@ -941,7 +954,7 @@ object ZkPath {
* znode is created and the create call returns OK. If
* the call receives a node exists event, then it checks
* if the session matches. If it does, then it returns OK,
- * and otherwise it fails the operation.
+ * and otherwise it fails the operation.
*/
class ZKCheckedEphemeral(path: String,
@@ -952,7 +965,7 @@ class ZKCheckedEphemeral(path: String,
private val getDataCallback = new GetDataCallback
val latch: CountDownLatch = new CountDownLatch(1)
var result: Code = Code.OK
-
+
private class CreateCallback extends StringCallback {
def processResult(rc: Int,
path: String,
@@ -1009,7 +1022,7 @@ class ZKCheckedEphemeral(path: String,
}
}
}
-
+
private def createEphemeral() {
zkHandle.create(path,
ZKStringSerializer.serialize(data),
@@ -1018,7 +1031,7 @@ class ZKCheckedEphemeral(path: String,
createCallback,
null)
}
-
+
private def createRecursive(prefix: String, suffix: String) {
debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
if(suffix.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bfac36ad/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index efcf930..bdadb15 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -17,7 +17,7 @@
package kafka.security.auth
import java.net.InetAddress
-import java.util.UUID
+import java.util.{UUID}
import kafka.network.RequestChannel.Session
import kafka.security.auth.Acl.WildCardHost
@@ -31,6 +31,7 @@ import org.junit.{After, Before, Test}
class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val simpleAclAuthorizer = new SimpleAclAuthorizer
+ val simpleAclAuthorizer2 = new SimpleAclAuthorizer
val testPrincipal = Acl.WildCardPrincipal
val testHostName = InetAddress.getByName("192.168.0.1")
val session = new Session(testPrincipal, testHostName)
@@ -48,12 +49,14 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
+ simpleAclAuthorizer2.configure(config.originals)
resource = new Resource(Topic, UUID.randomUUID().toString)
}
@After
override def tearDown(): Unit = {
simpleAclAuthorizer.close()
+ simpleAclAuthorizer2.close()
}
@Test
@@ -254,6 +257,87 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
assertEquals(acls1, authorizer.getAcls(resource1))
}
+ @Test
+ def testLocalConcurrentModificationOfResourceAcls() {
+ val commonResource = new Resource(Topic, "test")
+
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val acl1 = new Acl(user1, Allow, WildCardHost, Read)
+
+ val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+ val acl2 = new Acl(user2, Deny, WildCardHost, Read)
+
+ simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
+ simpleAclAuthorizer.addAcls(Set(acl2), commonResource)
+
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
+ }
+
+ @Test
+ def testDistributedConcurrentModificationOfResourceAcls() {
+ val commonResource = new Resource(Topic, "test")
+
+ val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val acl1 = new Acl(user1, Allow, WildCardHost, Read)
+
+ val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+ val acl2 = new Acl(user2, Deny, WildCardHost, Read)
+
+ // Add on each instance
+ simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
+ simpleAclAuthorizer2.addAcls(Set(acl2), commonResource)
+
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
+
+ val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
+ val acl3 = new Acl(user3, Deny, WildCardHost, Read)
+
+ // Add on one instance and delete on another
+ simpleAclAuthorizer.addAcls(Set(acl3), commonResource)
+ val deleted = simpleAclAuthorizer2.removeAcls(Set(acl3), commonResource)
+
+ assertTrue("The authorizer should see a value that needs to be deleted", deleted)
+
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
+ TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
+ }
+
+ @Test
+ def testHighConcurrencyModificationOfResourceAcls() {
+ val commonResource = new Resource(Topic, "test")
+
+ val acls = (0 to 100).map { i =>
+ val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
+ new Acl(useri, Allow, WildCardHost, Read)
+ }
+
+ // Alternate authorizer, Remove all acls that end in 0
+ val concurrentFuctions = acls.map { acl =>
+ () => {
+ val aclId = acl.principal.getName.toInt
+ if (aclId % 2 == 0) {
+ simpleAclAuthorizer.addAcls(Set(acl), commonResource)
+ } else {
+ simpleAclAuthorizer2.addAcls(Set(acl), commonResource)
+ }
+ if (aclId % 10 == 0) {
+ simpleAclAuthorizer2.removeAcls(Set(acl), commonResource)
+ }
+ }
+ }
+
+ val expectedAcls = acls.filter { acl =>
+ val aclId = acl.principal.getName.toInt
+ aclId % 10 != 0
+ }.toSet
+
+ TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 15000)
+
+ TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
+ TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
+ }
+
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
var acls = originalAcls
http://git-wip-us.apache.org/repos/asf/kafka/blob/bfac36ad/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7b3e955..0730468 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,8 +21,9 @@ import java.io._
import java.nio._
import java.nio.file.Files
import java.nio.channels._
-import java.util.Random
-import java.util.Properties
+import java.util
+import java.util.concurrent.{Callable, TimeUnit, Executors}
+import java.util.{Collections, Random, Properties}
import java.security.cert.X509Certificate
import javax.net.ssl.X509TrustManager
import charset.Charset
@@ -54,6 +55,7 @@ import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
import scala.collection.Map
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Utility functions to help with testing
@@ -131,6 +133,7 @@ object TestUtils extends Logging {
/**
* Create a kafka server instance with appropriate test settings
* USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
+ *
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig, time: Time = SystemTime): KafkaServer = {
@@ -141,7 +144,7 @@ object TestUtils extends Logging {
/**
* Create a test config for the provided parameters.
- *
+ *
* Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
*/
def createBrokerConfigs(numConfigs: Int,
@@ -281,6 +284,7 @@ object TestUtils extends Logging {
/**
* Wrap the message in a message set
+ *
* @param payload The bytes of the message
*/
def singleMessageSet(payload: Array[Byte],
@@ -291,6 +295,7 @@ object TestUtils extends Logging {
/**
* Generate an array of random bytes
+ *
* @param numBytes The size of the array
*/
def randomBytes(numBytes: Int): Array[Byte] = {
@@ -301,6 +306,7 @@ object TestUtils extends Logging {
/**
* Generate a random string of letters and digits of the given length
+ *
* @param len The length of the string
* @return The random string
*/
@@ -679,6 +685,7 @@ object TestUtils extends Logging {
* If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected.
* If oldLeaderOpt is defined, it waits until the new leader is different from the old leader.
* If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
+ *
* @return The new leader or assertion failure if timeout is reached.
*/
def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, timeoutMs: Long = 5000L,
@@ -786,6 +793,7 @@ object TestUtils extends Logging {
/**
* Wait until a valid leader is propagated to the metadata cache in each broker.
* It assumes that the leader propagated to each broker is the same.
+ *
* @param servers The list of servers that the metadata should reach to
* @param topic The topic name
* @param partition The partition Id
@@ -812,7 +820,7 @@ object TestUtils extends Logging {
}
def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L): Unit = {
- TestUtils.waitUntilTrue(() =>
+ TestUtils.waitUntilTrue(() =>
servers.exists { server =>
server.replicaManager.getPartition(topic, partition).exists(_.leaderReplicaIfLocal().isDefined)
},
@@ -968,12 +976,11 @@ object TestUtils extends Logging {
/**
* Consume all messages (or a specific number of messages)
+ *
* @param topicMessageStreams the Topic Message Streams
* @param nMessagesPerThread an optional field to specify the exact number of messages to be returned.
* ConsumerTimeoutException will be thrown if there are no messages to be consumed.
* If not specified, then all available messages will be consumed, and no exception is thrown.
- *
- *
* @return the list of messages consumed.
*/
def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
@@ -1033,6 +1040,7 @@ object TestUtils extends Logging {
/**
* Translate the given buffer into a string
+ *
* @param buffer The buffer to translate
* @param encoding The encoding to use in translating bytes to characters
*/
@@ -1075,6 +1083,46 @@ object TestUtils extends Logging {
s"expected acls $expected but got ${authorizer.getAcls(resource)}", waitTime = 10000)
}
+ /**
+ * To use this you pass in a sequence of functions that are your arrange/act/assert test on the SUT.
+ * They all run at the same time in the assertConcurrent method; the chances of triggering a multithreading code error,
+ * and thereby failing some assertion are greatly increased.
+ */
+ def assertConcurrent(message: String, functions: Seq[() => Any], timeoutMs: Int) {
+
+ def failWithTimeout() {
+ fail(s"$message. Timed out, the concurrent functions took more than $timeoutMs milliseconds")
+ }
+
+ val numThreads = functions.size
+ val threadPool = Executors.newFixedThreadPool(numThreads)
+ val exceptions = ArrayBuffer[Throwable]()
+ try {
+ val runnables = functions.map { function =>
+ new Callable[Unit] {
+ override def call(): Unit = function()
+ }
+ }.asJava
+ val futures = threadPool.invokeAll(runnables, timeoutMs, TimeUnit.MILLISECONDS).asScala
+ futures.foreach { future =>
+ if (future.isCancelled)
+ failWithTimeout()
+ else
+ try future.get()
+ catch { case e: Exception =>
+ exceptions += e
+ }
+ }
+ } catch {
+ case ie: InterruptedException => failWithTimeout()
+ case e => exceptions += e
+ } finally {
+ threadPool.shutdownNow()
+ }
+ assertTrue(s"$message failed with exception(s) $exceptions", exceptions.isEmpty)
+
+ }
+
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {