You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/09 05:41:41 UTC
kafka git commit: KAFKA-4864;
added correct zookeeper nodes for security migrator
Repository: kafka
Updated Branches:
refs/heads/trunk 537f98a5d -> 294018a57
KAFKA-4864; added correct zookeeper nodes for security migrator
Author: simplesteph <st...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2655 from simplesteph/fix-security-migrator-tool
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/294018a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/294018a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/294018a5
Branch: refs/heads/trunk
Commit: 294018a578cbbc187ac123a6b99990468186e349
Parents: 537f98a
Author: simplesteph <st...@gmail.com>
Authored: Wed Mar 8 21:41:38 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 8 21:41:38 2017 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 4 +-
.../security/auth/SimpleAclAuthorizer.scala | 4 +-
.../kafka/server/DynamicConfigManager.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 56 ++++++++++++--------
.../SaslScramSslEndToEndAuthorizationTest.scala | 2 +-
.../unit/kafka/admin/TopicCommandTest.scala | 2 +-
.../security/auth/ZkAuthorizationTest.scala | 6 +--
.../kafka/server/DynamicConfigChangeTest.scala | 2 +-
9 files changed, 46 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 65ac91c..d4ae4ff 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -565,7 +565,7 @@ object AdminUtils extends Logging with AdminUtilities {
writeEntityConfig(zkUtils, entityConfigPath, configs)
// create the change notification
- val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
+ val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
zkUtils.zkClient.createPersistentSequential(seqNode, content)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 9bd321c..eb5c142 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -220,12 +220,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
private def run(): Unit = {
try {
setAclIndividually("/")
- for (path <- zkUtils.securePersistentZkPaths) {
+ for (path <- ZkUtils.SecureZkRootPaths) {
debug("Going to set ACL for %s".format(path))
zkUtils.makeSurePersistentPathExists(path)
setAclsRecursively(path)
}
-
+
@tailrec
def recurse(): Unit = {
val future = futures.synchronized {
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 7ae4796..51de3bc 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -56,10 +56,10 @@ object SimpleAclAuthorizer {
* /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
* </pre>
*/
- val AclZkPath = "/kafka-acl"
+ val AclZkPath = ZkUtils.KafkaAclPath
//notification node which gets updated with the resource name when acl on a resource is changed.
- val AclChangedZkPath = "/kafka-acl-changes"
+ val AclChangedZkPath = ZkUtils.KafkaAclChangesPath
//prefix of all the change notification sequence node.
val AclChangedPrefix = "acl_changes_"
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index e0e6a03..c81ce6c 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -148,7 +148,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
}
}
- private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
+ private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
/**
* Begin watching for config changes
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7a6bd63..e67e264 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -43,19 +43,40 @@ import scala.collection._
import scala.collection.JavaConverters._
object ZkUtils {
- val ConsumersPath = "/consumers"
- val ClusterIdPath = "/cluster/id"
- val BrokerIdsPath = "/brokers/ids"
- val BrokerTopicsPath = "/brokers/topics"
+
+
+ // Important: it is necessary to add any new top level Zookeeper path here
+ val AdminPath = "/admin"
+ val BrokersPath = "/brokers"
+ val ClusterPath = "/cluster"
+ val ConfigPath = "/config"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
- val ReassignPartitionsPath = "/admin/reassign_partitions"
- val DeleteTopicsPath = "/admin/delete_topics"
- val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
- val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
- val EntityConfigPath = "/config"
- val EntityConfigChangesPath = "/config/changes"
+ val KafkaAclPath = "/kafka-acl"
+ val KafkaAclChangesPath = "/kafka-acl-changes"
+
+ val ConsumersPath = "/consumers"
+ val ClusterIdPath = s"$ClusterPath/id"
+ val BrokerIdsPath = s"$BrokersPath/ids"
+ val BrokerTopicsPath = s"$BrokersPath/topics"
+ val ReassignPartitionsPath = s"$AdminPath/reassign_partitions"
+ val DeleteTopicsPath = s"$AdminPath/delete_topics"
+ val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
+ val BrokerSequenceIdPath = s"$BrokersPath/seqid"
+ val ConfigChangesPath = s"$ConfigPath/changes"
+
+
+ // Important: it is necessary to add any new top level Zookeeper path to the Seq
+ val SecureZkRootPaths = Seq(AdminPath,
+ BrokersPath,
+ ClusterPath,
+ ConfigPath,
+ ControllerPath,
+ ControllerEpochPath,
+ IsrChangeNotificationPath,
+ KafkaAclPath,
+ KafkaAclChangesPath)
def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -117,13 +138,13 @@ object ZkUtils {
getTopicPartitionPath(topic, partitionId) + "/" + "state"
def getEntityConfigRootPath(entityType: String): String =
- ZkUtils.EntityConfigPath + "/" + entityType
+ ZkUtils.ConfigPath + "/" + entityType
def getEntityConfigPath(entityType: String, entity: String): String =
getEntityConfigRootPath(entityType) + "/" + entity
def getEntityConfigPath(entityPath: String): String =
- ZkUtils.EntityConfigPath + "/" + entityPath
+ ZkUtils.ConfigPath + "/" + entityPath
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
@@ -191,22 +212,13 @@ class ZkUtils(val zkClient: ZkClient,
val persistentZkPaths = Seq(ConsumersPath,
BrokerIdsPath,
BrokerTopicsPath,
- EntityConfigChangesPath,
+ ConfigChangesPath,
getEntityConfigRootPath(ConfigType.Topic),
getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath)
- val securePersistentZkPaths = Seq(BrokerIdsPath,
- BrokerTopicsPath,
- EntityConfigChangesPath,
- getEntityConfigRootPath(ConfigType.Topic),
- getEntityConfigRootPath(ConfigType.Client),
- DeleteTopicsPath,
- BrokerSequenceIdPath,
- IsrChangeNotificationPath)
-
val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
def getController(): Int = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index fe0204a..86db407 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -32,7 +32,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override def configureSecurityBeforeServersStart() {
super.configureSecurityBeforeServersStart()
- zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
+ zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
def configCommandArgs(username: String, password: String) : Array[String] = {
val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]")
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 8ce7c90..5215867 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -49,7 +49,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
// pre-create the topic config changes path to avoid a NoNodeException
- zkUtils.createPersistentPath(EntityConfigChangesPath)
+ zkUtils.createPersistentPath(ConfigChangesPath)
// modify the topic to add new partitions
val numPartitionsModified = 3
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 8cec0c7..3b4c48e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -153,7 +153,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
@Test
def testDeleteRecursive() {
info(s"zkConnect string: $zkConnect")
- for (path <- zkUtils.securePersistentZkPaths) {
+ for (path <- ZkUtils.SecureZkRootPaths) {
info(s"Creating $path")
zkUtils.makeSurePersistentPathExists(path)
zkUtils.createPersistentPath(s"$path/fpjwashere", "")
@@ -185,7 +185,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
*/
private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
info(s"zkConnect string: $zkUrl")
- for (path <- firstZk.securePersistentZkPaths) {
+ for (path <- ZkUtils.SecureZkRootPaths) {
info(s"Creating $path")
firstZk.makeSurePersistentPathExists(path)
// Create a child for each znode to exercise the recurrent
@@ -206,7 +206,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
}
ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
info("Done with migration")
- for (path <- secondZk.securePersistentZkPaths) {
+ for (path <- ZkUtils.SecureZkRootPaths) {
val listParent = secondZk.zkConnection.getAcl(path).getKey
assertTrue(path, isAclCorrect(listParent, secondZk.isSecure))
http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cf0dc6f..dc30fb2 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -148,7 +148,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
// Remove config change znodes to force quota initialization only through loading of user/client quotas
- zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) }
+ zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
server.startup()
val quotaManagers = server.apis.quotas