You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/08 03:14:04 UTC

kafka git commit: Kafka-2587: Only notification handler will update the cache and all verifications will use waitUntilTrue.

Repository: kafka
Updated Branches:
  refs/heads/trunk 118912e76 -> 2254f2bfa


Kafka-2587:  Only notification handler will update the cache and all verifications will use waitUntilTrue.

Author: Parth Brahmbhatt <br...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #277 from Parth-Brahmbhatt/KAFKA-2587


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2254f2bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2254f2bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2254f2bf

Branch: refs/heads/trunk
Commit: 2254f2bfaf5a6a2332ec390471ff3a7c34b8b890
Parents: 118912e
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Oct 7 18:13:58 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 7 18:13:58 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/security/auth/SimpleAclAuthorizer.scala  | 3 ---
 core/src/test/scala/unit/kafka/admin/AclCommandTest.scala     | 6 +++---
 .../unit/kafka/security/auth/SimpleAclAuthorizerTest.scala    | 7 +++----
 core/src/test/scala/unit/kafka/utils/TestUtils.scala          | 6 ++++++
 4 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2254f2bf/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 6576264..2e5ee8d 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -168,7 +168,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
         ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
 
       updateAclChangedFlag(resource)
-      updateCache(resource, updatedAcls)
     }
   }
 
@@ -186,7 +185,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
           ZkUtils.deletePath(zkClient, toResourcePath(resource))
 
         updateAclChangedFlag(resource)
-        updateCache(resource, filteredAcls)
       }
 
       aclNeedsRemoval
@@ -197,7 +195,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
       ZkUtils.deletePath(zkClient, toResourcePath(resource))
       updateAclChangedFlag(resource)
-      updateCache(resource, Set.empty[Acl])
       true
     } else false
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2254f2bf/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index f6d3667..e2a75e2 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -78,7 +78,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
         val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
           AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
           for (resource <- resources) {
-            Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
+            TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
           }
 
           testRemove(resources, resourceCmd, args, brokerProps)
@@ -97,7 +97,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
       AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
       for ((resources, acls) <- resourcesToAcls) {
         for (resource <- resources) {
-          Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
+          TestUtils.waitAndVerifyAcls(acls, getAuthorizer(brokerProps), resource)
         }
       }
       testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
@@ -108,7 +108,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
     for (resource <- resources) {
       Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {
         AclCommand.main(args ++ resourceCmd :+ "--remove")
-        Assert.assertEquals(Set.empty[Acl], getAuthorizer(brokerProps).getAcls(resource))
+        TestUtils.waitAndVerifyAcls(Set.empty[Acl], getAuthorizer(brokerProps), resource)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2254f2bf/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 3276a79..655bc20 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -211,14 +211,14 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     )
 
     resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
-    assertEquals(resourceToAcls + (resource -> acls), simpleAclAuthorizer.getAcls())
+    TestUtils.waitUntilTrue(() => resourceToAcls + (resource -> acls) == simpleAclAuthorizer.getAcls(), "changes not propagated in timeout period.")
 
     //test remove acl from existing acls.
     acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
 
     //test remove all acls for resource
     simpleAclAuthorizer.removeAcls(resource)
-    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == Set.empty[Acl], "changes not propagated in timeout period.")
+    TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
     assertTrue(!ZkUtils.pathExists(zkClient, simpleAclAuthorizer.toResourcePath(resource)))
 
     //test removing last acl also deletes zookeeper path
@@ -261,8 +261,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
       acls --=removedAcls
     }
 
-    TestUtils.waitUntilTrue(() => simpleAclAuthorizer.getAcls(resource) == acls,
-      s"changes not propagated in timeout period. expected $acls but got ${simpleAclAuthorizer.getAcls(resource)}", waitTime = 10000)
+    TestUtils.waitAndVerifyAcls(acls, simpleAclAuthorizer, resource)
 
     acls
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2254f2bf/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8e4c263..4a53e11 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,6 +26,7 @@ import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
 
+import kafka.security.auth.{Resource, Authorizer, Acl}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 
@@ -942,6 +943,11 @@ object TestUtils extends Logging {
     trustManager
   }
 
+  def waitAndVerifyAcls(expected: Set[Acl], authorizer: Authorizer, resource: Resource) = {
+    TestUtils.waitUntilTrue(() => authorizer.getAcls(resource) == expected,
+      s"expected acls $expected but got ${authorizer.getAcls(resource)}", waitTime = 10000)
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {