You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/09 05:41:41 UTC

kafka git commit: KAFKA-4864; added correct zookeeper nodes for security migrator

Repository: kafka
Updated Branches:
  refs/heads/trunk 537f98a5d -> 294018a57


KAFKA-4864; added correct zookeeper nodes for security migrator

Author: simplesteph <st...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2655 from simplesteph/fix-security-migrator-tool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/294018a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/294018a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/294018a5

Branch: refs/heads/trunk
Commit: 294018a578cbbc187ac123a6b99990468186e349
Parents: 537f98a
Author: simplesteph <st...@gmail.com>
Authored: Wed Mar 8 21:41:38 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 8 21:41:38 2017 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  4 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  4 +-
 .../kafka/server/DynamicConfigManager.scala     |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 56 ++++++++++++--------
 .../SaslScramSslEndToEndAuthorizationTest.scala |  2 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |  2 +-
 .../security/auth/ZkAuthorizationTest.scala     |  6 +--
 .../kafka/server/DynamicConfigChangeTest.scala  |  2 +-
 9 files changed, 46 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 65ac91c..d4ae4ff 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -565,7 +565,7 @@ object AdminUtils extends Logging with AdminUtilities {
     writeEntityConfig(zkUtils, entityConfigPath, configs)
 
     // create the change notification
-    val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
+    val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
     val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
     zkUtils.zkClient.createPersistentSequential(seqNode, content)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 9bd321c..eb5c142 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -220,12 +220,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
   private def run(): Unit = {
     try {
       setAclIndividually("/")
-      for (path <- zkUtils.securePersistentZkPaths) {
+      for (path <- ZkUtils.SecureZkRootPaths) {
         debug("Going to set ACL for %s".format(path))
         zkUtils.makeSurePersistentPathExists(path)
         setAclsRecursively(path)
       }
-      
+
       @tailrec
       def recurse(): Unit = {
         val future = futures.synchronized { 

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 7ae4796..51de3bc 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -56,10 +56,10 @@ object SimpleAclAuthorizer {
    * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
    * </pre>
    */
-  val AclZkPath = "/kafka-acl"
+  val AclZkPath = ZkUtils.KafkaAclPath
 
   //notification node which gets updated with the resource name when acl on a resource is changed.
-  val AclChangedZkPath = "/kafka-acl-changes"
+  val AclChangedZkPath = ZkUtils.KafkaAclChangesPath
 
   //prefix of all the change notification sequence node.
   val AclChangedPrefix = "acl_changes_"

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index e0e6a03..c81ce6c 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -148,7 +148,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
     }
   }
 
-  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
+  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
 
   /**
    * Begin watching for config changes

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7a6bd63..e67e264 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -43,19 +43,40 @@ import scala.collection._
 import scala.collection.JavaConverters._
 
 object ZkUtils {
-  val ConsumersPath = "/consumers"
-  val ClusterIdPath = "/cluster/id"
-  val BrokerIdsPath = "/brokers/ids"
-  val BrokerTopicsPath = "/brokers/topics"
+
+
+  // Important: it is necessary to add any new top level Zookeeper path here
+  val AdminPath = "/admin"
+  val BrokersPath = "/brokers"
+  val ClusterPath = "/cluster"
+  val ConfigPath = "/config"
   val ControllerPath = "/controller"
   val ControllerEpochPath = "/controller_epoch"
-  val ReassignPartitionsPath = "/admin/reassign_partitions"
-  val DeleteTopicsPath = "/admin/delete_topics"
-  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
-  val BrokerSequenceIdPath = "/brokers/seqid"
   val IsrChangeNotificationPath = "/isr_change_notification"
-  val EntityConfigPath = "/config"
-  val EntityConfigChangesPath = "/config/changes"
+  val KafkaAclPath = "/kafka-acl"
+  val KafkaAclChangesPath = "/kafka-acl-changes"
+
+  val ConsumersPath = "/consumers"
+  val ClusterIdPath = s"$ClusterPath/id"
+  val BrokerIdsPath = s"$BrokersPath/ids"
+  val BrokerTopicsPath = s"$BrokersPath/topics"
+  val ReassignPartitionsPath = s"$AdminPath/reassign_partitions"
+  val DeleteTopicsPath = s"$AdminPath/delete_topics"
+  val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
+  val BrokerSequenceIdPath = s"$BrokersPath/seqid"
+  val ConfigChangesPath = s"$ConfigPath/changes"
+
+
+  // Important: it is necessary to add any new top level Zookeeper path to the Seq
+  val SecureZkRootPaths = Seq(AdminPath,
+                              BrokersPath,
+                              ClusterPath,
+                              ConfigPath,
+                              ControllerPath,
+                              ControllerEpochPath,
+                              IsrChangeNotificationPath,
+                              KafkaAclPath,
+                              KafkaAclChangesPath)
 
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -117,13 +138,13 @@ object ZkUtils {
     getTopicPartitionPath(topic, partitionId) + "/" + "state"
 
   def getEntityConfigRootPath(entityType: String): String =
-    ZkUtils.EntityConfigPath + "/" + entityType
+    ZkUtils.ConfigPath + "/" + entityType
 
   def getEntityConfigPath(entityType: String, entity: String): String =
     getEntityConfigRootPath(entityType) + "/" + entity
 
   def getEntityConfigPath(entityPath: String): String =
-    ZkUtils.EntityConfigPath + "/" + entityPath
+    ZkUtils.ConfigPath + "/" + entityPath
 
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
@@ -191,22 +212,13 @@ class ZkUtils(val zkClient: ZkClient,
   val persistentZkPaths = Seq(ConsumersPath,
                               BrokerIdsPath,
                               BrokerTopicsPath,
-                              EntityConfigChangesPath,
+                              ConfigChangesPath,
                               getEntityConfigRootPath(ConfigType.Topic),
                               getEntityConfigRootPath(ConfigType.Client),
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath)
 
-  val securePersistentZkPaths = Seq(BrokerIdsPath,
-                                    BrokerTopicsPath,
-                                    EntityConfigChangesPath,
-                                    getEntityConfigRootPath(ConfigType.Topic),
-                                    getEntityConfigRootPath(ConfigType.Client),
-                                    DeleteTopicsPath,
-                                    BrokerSequenceIdPath,
-                                    IsrChangeNotificationPath)
-
   val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
 
   def getController(): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index fe0204a..86db407 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -32,7 +32,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
 
   override def configureSecurityBeforeServersStart() {
     super.configureSecurityBeforeServersStart()
-    zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
+    zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
 
     def configCommandArgs(username: String, password: String) : Array[String] = {
       val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]")

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 8ce7c90..5215867 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -49,7 +49,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
 
     // pre-create the topic config changes path to avoid a NoNodeException
-    zkUtils.createPersistentPath(EntityConfigChangesPath)
+    zkUtils.createPersistentPath(ConfigChangesPath)
 
     // modify the topic to add new partitions
     val numPartitionsModified = 3

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 8cec0c7..3b4c48e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -153,7 +153,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   @Test
   def testDeleteRecursive() {
     info(s"zkConnect string: $zkConnect")
-    for (path <- zkUtils.securePersistentZkPaths) {
+    for (path <- ZkUtils.SecureZkRootPaths) {
       info(s"Creating $path")
       zkUtils.makeSurePersistentPathExists(path)
       zkUtils.createPersistentPath(s"$path/fpjwashere", "")
@@ -185,7 +185,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    */
   private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
     info(s"zkConnect string: $zkUrl")
-    for (path <- firstZk.securePersistentZkPaths) {
+    for (path <- ZkUtils.SecureZkRootPaths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
       // Create a child for each znode to exercise the recurrent
@@ -206,7 +206,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
       }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
-    for (path <- secondZk.securePersistentZkPaths) {
+    for (path <- ZkUtils.SecureZkRootPaths) {
       val listParent = secondZk.zkConnection.getAcl(path).getKey
       assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cf0dc6f..dc30fb2 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -148,7 +148,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
 
     // Remove config change znodes to force quota initialization only through loading of user/client quotas
-    zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) }
+    zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
     server.startup()
     val quotaManagers = server.apis.quotas