You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:59 UTC
[3/3] kafka git commit: KAFKA-5646;
Use KafkaZkClient in DynamicConfigManager and AdminManager
KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager
* Add AdminZkClient class
* Use KafkaZkClient, AdminZkClient in ConfigCommand, TopicCommand
* All the existing tests should work
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #4194 from omkreddy/KAFKA-5646-ZK-ADMIN-UTILS-DYNAMIC-MANAGER
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc852baf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc852baf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc852baf
Branch: refs/heads/trunk
Commit: bc852baffbf602ead9cb719a01747de414940d53
Parents: c5f31fe
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Nov 22 13:25:52 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 22 13:25:52 2017 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 15 +
.../main/scala/kafka/admin/ConfigCommand.scala | 37 +-
.../main/scala/kafka/admin/TopicCommand.scala | 86 ++--
.../main/scala/kafka/server/AdminManager.scala | 32 +-
.../kafka/server/DynamicConfigManager.scala | 16 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 10 +-
.../main/scala/kafka/server/KafkaServer.scala | 6 +-
.../src/main/scala/kafka/zk/AdminZkClient.scala | 417 +++++++++++++++++++
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 339 ++++++++++++---
core/src/main/scala/kafka/zk/ZkData.scala | 27 +-
.../ReassignPartitionsIntegrationTest.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../kafka/api/BaseProducerSendTest.scala | 3 +-
.../kafka/api/ClientIdQuotaTest.scala | 3 +-
.../api/RackAwareAutoTopicCreationTest.scala | 4 +-
.../kafka/api/UserClientIdQuotaTest.scala | 9 +-
.../integration/kafka/api/UserQuotaTest.scala | 5 +-
.../ReplicaFetcherThreadFatalErrorTest.scala | 3 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 12 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 78 ++--
.../unit/kafka/admin/DeleteTopicTest.scala | 38 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 4 +-
.../kafka/admin/ListConsumerGroupTest.scala | 3 +-
.../admin/ResetConsumerGroupOffsetTest.scala | 80 ++--
.../unit/kafka/admin/TopicCommandTest.scala | 44 +-
.../controller/ControllerFailoverTest.scala | 3 +-
...MetricsDuringTopicCreationDeletionTest.scala | 5 +-
.../kafka/integration/TopicMetadataTest.scala | 3 +-
.../integration/UncleanLeaderElectionTest.scala | 11 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 9 +-
.../unit/kafka/producer/SyncProducerTest.scala | 8 +-
.../kafka/server/DynamicConfigChangeTest.scala | 24 +-
.../unit/kafka/server/DynamicConfigTest.scala | 29 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 1 -
.../scala/unit/kafka/server/LogOffsetTest.scala | 9 +-
.../unit/kafka/server/OffsetCommitTest.scala | 4 +-
.../kafka/server/ReplicationQuotasTest.scala | 16 +-
.../unit/kafka/server/RequestQuotaTest.scala | 5 +-
...rivenReplicationProtocolAcceptanceTest.scala | 9 +-
.../epoch/LeaderEpochIntegrationTest.scala | 7 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 323 ++++++++++++++
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 101 ++++-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 3 +
43 files changed, 1459 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 32cab2a..09a65af 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.kafka.common.internals.Topic
+@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
trait AdminUtilities {
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
@@ -267,6 +268,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param validateOnly If true, validate the parameters without actually adding the partitions
* @return the updated replica assignment
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def addPartitions(zkUtils: ZkUtils,
topic: String,
existingAssignment: Map[Int, Seq[Int]],
@@ -359,6 +361,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def deleteTopic(zkUtils: ZkUtils, topic: String) {
if (topicExists(zkUtils, topic)) {
try {
@@ -434,6 +437,7 @@ object AdminUtils extends Logging with AdminUtilities {
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.pathExists(getTopicPath(topic))
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
val allBrokers = zkUtils.getAllBrokersInCluster()
@@ -452,6 +456,7 @@ object AdminUtils extends Logging with AdminUtilities {
brokerMetadatas.sortBy(_.id)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
@@ -463,6 +468,7 @@ object AdminUtils extends Logging with AdminUtilities {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -501,6 +507,7 @@ object AdminUtils extends Logging with AdminUtilities {
LogConfig.validate(config)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -548,6 +555,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
DynamicConfig.Client.validate(configs)
changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
@@ -564,6 +572,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
@@ -589,6 +598,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
validateTopicConfig(zkUtils, topic, configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
@@ -602,6 +612,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param brokers: The list of brokers to apply config changes to
* @param configs: The config to change, as properties
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
DynamicConfig.Broker.validate(configs)
brokers.foreach { broker =>
@@ -637,6 +648,7 @@ object AdminUtils extends Logging with AdminUtilities {
* Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
* sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
// readDataMaybeNull returns Some(null) if the path exists, but there is no data
@@ -657,12 +669,15 @@ object AdminUtils extends Logging with AdminUtilities {
props
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
val root = rootPath match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index febf40f..077ecce 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,8 +24,10 @@ import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.CommandLineUtils
import kafka.utils.Implicits._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.{Sanitizer, Utils}
@@ -61,26 +63,25 @@ object ConfigCommand extends Config {
opts.checkArgs()
- val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled())
+ val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+ val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+ val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
- alterConfig(zkUtils, opts)
+ alterConfig(zkClient, opts, adminZkClient)
else if (opts.options.has(opts.describeOpt))
- describeConfig(zkUtils, opts)
+ describeConfig(zkClient, opts, adminZkClient)
} catch {
case e: Throwable =>
println("Error while executing config command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
- zkUtils.close()
+ zkClient.close()
}
}
- private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
+ private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entity = parseEntity(opts)
@@ -91,7 +92,7 @@ object ConfigCommand extends Config {
preProcessScramCredentials(configsToBeAdded)
// compile the final set of configs
- val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
+ val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
// fail the command if any of the configs to be deleted does not exist
val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
@@ -101,7 +102,7 @@ object ConfigCommand extends Config {
configs ++= configsToBeAdded
configsToBeDeleted.foreach(configs.remove(_))
- utils.changeConfigs(zkUtils, entityType, entityName, configs)
+ adminZkClient.changeConfigs(entityType, entityName, configs)
println(s"Completed Updating config for entity: $entity.")
}
@@ -127,12 +128,12 @@ object ConfigCommand extends Config {
}
}
- private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
+ private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
- val entities = configEntity.getAllEntities(zkUtils)
+ val entities = configEntity.getAllEntities(zkClient)
for (entity <- entities) {
- val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName)
+ val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, entity.fullSanitizedName)
// When describing all users, don't include empty user nodes with only <user, client> quota overrides.
if (!configs.isEmpty || !describeAllUsers) {
println("Configs for %s are %s"
@@ -196,7 +197,7 @@ object ConfigCommand extends Config {
case class ConfigEntity(root: Entity, child: Option[Entity]) {
val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("")
- def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+ def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
// Describe option examples:
// Describe entity with specified name:
// --entity-type topics --entity-name topic1 (topic1)
@@ -211,19 +212,19 @@ object ConfigCommand extends Config {
// --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>)
(root.sanitizedName, child) match {
case (None, _) =>
- val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+ val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
.map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
child match {
case Some(s) =>
rootEntities.flatMap(rootEntity =>
- ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
+ ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkClient))
case None => rootEntities
}
case (_, Some(childEntity)) =>
childEntity.sanitizedName match {
case Some(_) => Seq(this)
case None =>
- zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
+ zkClient.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
.map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f2a74a0..bdd8aaf 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,18 +25,19 @@ import kafka.utils.Implicits._
import kafka.consumer.Whitelist
import kafka.log.LogConfig
import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
import scala.collection._
-
object TopicCommand extends Logging {
def main(args: Array[String]): Unit = {
@@ -53,36 +54,35 @@ object TopicCommand extends Logging {
opts.checkArgs()
- val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled())
+ val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+ val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
- createTopic(zkUtils, opts)
+ createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
- alterTopic(zkUtils, opts)
+ alterTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
- listTopics(zkUtils, opts)
+ listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
- describeTopic(zkUtils, opts)
+ describeTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
- deleteTopic(zkUtils, opts)
+ deleteTopic(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
- zkUtils.close()
+ zkClient.close()
Exit.exit(exitCode)
}
}
- private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
- val allTopics = zkUtils.getAllTopics().sorted
+ private def getTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions): Seq[String] = {
+ val allTopics = zkClient.getAllTopicsInCluster.sorted
if (opts.options.has(opts.topicOpt)) {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
@@ -91,23 +91,24 @@ object TopicCommand extends Logging {
allTopics
}
- def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
+ val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
- AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
+ adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
@@ -115,15 +116,16 @@ object TopicCommand extends Logging {
}
}
- def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
+ val adminZkClient = new AdminZkClient(zkClient)
topics.foreach { topic =>
- val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
println(" Going forward, please use kafka-configs.sh for this functionality")
@@ -133,7 +135,7 @@ object TopicCommand extends Logging {
// compile the final set of configs
configs ++= configsToBeAdded
configsToBeDeleted.foreach(config => configs.remove(config))
- AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+ adminZkClient.changeTopicConfig(topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
@@ -144,7 +146,7 @@ object TopicCommand extends Logging {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
- val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+ val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
if (existingAssignment.isEmpty)
@@ -155,17 +157,17 @@ object TopicCommand extends Logging {
val partitionList = replicaAssignmentString.split(",").drop(startPartitionId)
AdminUtils.parseReplicaAssignment(partitionList.mkString(","), startPartitionId)
}
- val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
- AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
+ val allBrokers = adminZkClient.getBrokerMetadatas()
+ adminZkClient.addPartitions(topic, existingAssignment, allBrokers, nPartitions, newAssignment)
println("Adding partitions succeeded!")
}
}
}
- def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def listTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
for(topic <- topics) {
- if (zkUtils.isTopicMarkedForDeletion(topic)) {
+ if (zkClient.isTopicMarkedForDeletion(topic)) {
println("%s - marked for deletion".format(topic))
} else {
println(topic)
@@ -173,8 +175,8 @@ object TopicCommand extends Logging {
}
}
- def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
@@ -185,12 +187,12 @@ object TopicCommand extends Logging {
if (Topic.isInternal(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
- zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+ zkClient.createDeleteTopicPath(topic)
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
- case _: ZkNodeExistsException =>
+ case _: NodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
@@ -200,21 +202,23 @@ object TopicCommand extends Logging {
}
}
- def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
- val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
+ val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
+ val adminZkClient = new AdminZkClient(zkClient)
+
for (topic <- topics) {
- zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
+ zkClient.getPartitionAssignmentForTopics(immutable.Set(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1)
- val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic)
+ val markedForDeletion = zkClient.isTopicMarkedForDeletion(topic)
if (describeConfigs) {
- val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+ val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic).asScala
if (!reportOverriddenConfigs || configs.nonEmpty) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
@@ -226,8 +230,10 @@ object TopicCommand extends Logging {
}
if (describePartitions) {
for ((partitionId, assignedReplicas) <- sortedPartitions) {
- val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
- val leader = zkUtils.getLeaderForPartition(topic, partitionId)
+ val leaderIsrEpoch = zkClient.getTopicPartitionState(new TopicPartition(topic, partitionId))
+ val inSyncReplicas = if (leaderIsrEpoch.isEmpty) Seq.empty[Int] else leaderIsrEpoch.get.leaderAndIsr.isr
+ val leader = if (leaderIsrEpoch.isEmpty) None else Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 935fade..8f69000 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
@@ -41,11 +42,12 @@ import scala.collection.JavaConverters._
class AdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
- val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+ val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+ private val adminZkClient = new AdminZkClient(zkClient)
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
@@ -101,7 +103,7 @@ class AdminManager(val config: KafkaConfig,
createTopicPolicy match {
case Some(policy) =>
- AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
// Use `null` for unset fields in the public API
val numPartitions: java.lang.Integer =
@@ -114,13 +116,13 @@ class AdminManager(val config: KafkaConfig,
arguments.configs))
if (!validateOnly)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
case None =>
if (validateOnly)
- AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
else
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
}
CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
} catch {
@@ -165,7 +167,7 @@ class AdminManager(val config: KafkaConfig,
// 1. map over topics calling the asynchronous delete
val metadata = topics.map { topic =>
try {
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
case _: TopicAlreadyMarkedForDeletionException =>
@@ -203,8 +205,8 @@ class AdminManager(val config: KafkaConfig,
listenerName: ListenerName,
callback: Map[String, ApiError] => Unit): Unit = {
- val reassignPartitionsInProgress = zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
- val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
+ val allBrokers = adminZkClient.getBrokerMetadatas()
val allBrokerIds = allBrokers.map(_.id)
// 1. map over topics creating assignment and calling AdminUtils
@@ -215,7 +217,7 @@ class AdminManager(val config: KafkaConfig,
if (reassignPartitionsInProgress)
throw new ReassignmentInProgressException("A partition reassignment is in progress.")
- val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+ val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
if (existingAssignment.isEmpty)
@@ -247,7 +249,7 @@ class AdminManager(val config: KafkaConfig,
}.toMap
}
- val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers,
+ val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
newPartition.totalCount, reassignment, validateOnly = validateOnly)
CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
} catch {
@@ -306,7 +308,7 @@ class AdminManager(val config: KafkaConfig,
val topic = resource.name
Topic.validate(topic)
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
- val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
@@ -350,19 +352,19 @@ class AdminManager(val config: KafkaConfig,
alterConfigPolicy match {
case Some(policy) =>
- AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ adminZkClient.validateTopicConfig(topic, properties)
val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
if (!validateOnly)
- AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ adminZkClient.changeTopicConfig(topic, properties)
case None =>
if (validateOnly)
- AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ adminZkClient.validateTopicConfig(topic, properties)
else
- AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ adminZkClient.changeTopicConfig(topic, properties)
}
resource -> ApiError.NONE
case resourceType =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 6392723..457742d 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -26,7 +26,7 @@ import scala.collection._
import scala.collection.JavaConverters._
import kafka.admin.AdminUtils
import kafka.utils.json.JsonObject
-import kafka.zk.KafkaZkClient
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.utils.Time
@@ -84,11 +84,11 @@ object ConfigEntityName {
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
-class DynamicConfigManager(private val oldZkUtils: ZkUtils,
- private val zkClient: KafkaZkClient,
+class DynamicConfigManager(private val zkClient: KafkaZkClient,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = Time.SYSTEM) extends Logging {
+ val adminZkClient = new AdminZkClient(zkClient)
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
@@ -120,7 +120,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
}
- val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
+ val entityConfig = adminZkClient.fetchEntityConfig(entityType, entity)
info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)
@@ -141,7 +141,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
}
val fullSanitizedEntityName = entityPath.substring(index + 1)
- val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, rootEntityType, fullSanitizedEntityName)
+ val entityConfig = adminZkClient.fetchEntityConfig(rootEntityType, fullSanitizedEntityName)
val loggableConfig = entityConfig.asScala.map {
case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
}
@@ -163,14 +163,14 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
// Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
configHandlers.foreach {
case (ConfigType.User, handler) =>
- AdminUtils.fetchAllEntityConfigs(oldZkUtils, ConfigType.User).foreach {
+ adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach {
case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
}
- AdminUtils.fetchAllChildEntityConfigs(oldZkUtils, ConfigType.User, ConfigType.Client).foreach {
+ adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach {
case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
}
case (configType, handler) =>
- AdminUtils.fetchAllEntityConfigs(oldZkUtils, configType).foreach {
+ adminZkClient.fetchAllEntityConfigs(configType).foreach {
case (entityName, properties) => handler.processConfigChanges(entityName, properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index de56986..a31b6c3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.controller.{KafkaController}
+import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
@@ -37,8 +37,8 @@ import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -71,7 +71,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
- val zkUtils: ZkUtils,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
@@ -84,6 +83,7 @@ class KafkaApis(val requestChannel: RequestChannel,
time: Time) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+ val adminZkClient = new AdminZkClient(zkClient)
def close() {
info("Shutdown complete.")
@@ -829,7 +829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
- AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
+ adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1812eb0..7f61479 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -242,7 +242,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
kafkaController.startup()
- adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
+ adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
@@ -263,7 +263,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
- kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+ kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
@@ -278,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
- dynamicConfigManager = new DynamicConfigManager(zkUtils, zkClient, dynamicConfigHandlers)
+ dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/AdminZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
new file mode 100644
index 0000000..e00b8e6
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.log.LogConfig
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
+import org.apache.zookeeper.KeeperException.NodeExistsException
+
+import scala.collection.{Map, Seq}
+
+class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
+
+ /**
+ * Creates the topic with given configuration
+ * @param topic topic name to create
+ * @param partitions Number of partitions to be set
+ * @param replicationFactor Replication factor
+ * @param topicConfig topic configs
+ * @param rackAwareMode
+ */
+ def createTopic(topic: String,
+ partitions: Int,
+ replicationFactor: Int,
+ topicConfig: Properties = new Properties,
+ rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+ val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
+ val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
+ createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
+ }
+
+ /**
+ * Gets broker metadata list
+ * @param rackAwareMode
+ * @param brokerList
+ * @return
+ */
+ def getBrokerMetadatas(rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+ brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+ val allBrokers = zkClient.getAllBrokersInCluster
+ val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+ val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+ if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+ throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+ " to make replica assignment without rack information.")
+ }
+ val brokerMetadatas = rackAwareMode match {
+ case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+ case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+ brokers.map(broker => BrokerMetadata(broker.id, None))
+ case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+ }
+ brokerMetadatas.sortBy(_.id)
+ }
+
+ /**
+ * Creates or Updates the partition assignment for a given topic
+ * @param topic
+ * @param partitionReplicaAssignment
+ * @param config
+ * @param update
+ */
+ def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
+ partitionReplicaAssignment: Map[Int, Seq[Int]],
+ config: Properties = new Properties,
+ update: Boolean = false) {
+ validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
+
+ // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+ if (!update) {
+ // write out the config if there is any, this isn't transactional with the partition assignments
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
+ }
+
+ // create the partition assignment
+ writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
+ }
+
+ /**
+ * Validate method to use before the topic creation or update
+ * @param topic
+ * @param partitionReplicaAssignment
+ * @param config
+ * @param update
+ */
+ def validateCreateOrUpdateTopic(topic: String,
+ partitionReplicaAssignment: Map[Int, Seq[Int]],
+ config: Properties,
+ update: Boolean): Unit = {
+ // validate arguments
+ Topic.validate(topic)
+
+ if (!update) {
+ if (zkClient.topicExists(topic))
+ throw new TopicExistsException(s"Topic '$topic' already exists.")
+ else if (Topic.hasCollisionChars(topic)) {
+ val allTopics = zkClient.getAllTopicsInCluster
+ // check again in case the topic was created in the meantime, otherwise the
+ // topic could potentially collide with itself
+ if (allTopics.contains(topic))
+ throw new TopicExistsException(s"Topic '$topic' already exists.")
+ val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
+ if (collidingTopics.nonEmpty) {
+ throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
+ }
+ }
+ }
+
+ if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+ throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+ partitionReplicaAssignment.values.foreach(reps =>
+ if (reps.size != reps.toSet.size)
+ throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+ )
+
+ // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+ if (!update)
+ LogConfig.validate(config)
+ }
+
+ private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+ try {
+ val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
+
+ if (!update) {
+ info("Topic creation " + assignment)
+ zkClient.createTopicAssignment(topic, assignment)
+ } else {
+ info("Topic update " + assignment)
+ zkClient.setTopicAssignment(topic, assignment)
+ }
+ debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
+ } catch {
+ case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
+ case e2: Throwable => throw new AdminOperationException(e2.toString)
+ }
+ }
+
+
+ /**
+ * Creates a delete path for a given topic
+ * @param topic
+ */
+ def deleteTopic(topic: String) {
+ if (zkClient.topicExists(topic)) {
+ try {
+ zkClient.createDeleteTopicPath(topic)
+ } catch {
+ case _: NodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+ "topic %s is already marked for deletion".format(topic))
+ case e: Throwable => throw new AdminOperationException(e.getMessage)
+ }
+ } else {
+ throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
+ }
+ }
+
+ /**
+ * Add partitions to existing topic with optional replica assignment
+ *
+ * @param topic Topic for adding partitions to
+ * @param existingAssignment A map from partition id to its assigned replicas
+ * @param allBrokers All brokers in the cluster
+ * @param numPartitions Number of partitions to be set
+ * @param replicaAssignment Manual replica assignment, or none
+ * @param validateOnly If true, validate the parameters without actually adding the partitions
+ * @return the updated replica assignment
+ */
+ def addPartitions(topic: String,
+ existingAssignment: Map[Int, Seq[Int]],
+ allBrokers: Seq[BrokerMetadata],
+ numPartitions: Int = 1,
+ replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
+ validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+ val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+ throw new AdminOperationException(
+ s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+ s"Assignment: $existingAssignment"))
+
+ val partitionsToAdd = numPartitions - existingAssignment.size
+ if (partitionsToAdd <= 0)
+ throw new InvalidPartitionsException(
+ s"The number of partitions for a topic can only be increased. " +
+ s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+ s"$numPartitions would not be an increase.")
+
+ replicaAssignment.foreach { proposedReplicaAssignment =>
+ validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0.size,
+ allBrokers.map(_.id).toSet)
+ }
+
+ val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+ val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+ AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+ startIndex, existingAssignment.size)
+ }
+ val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
+ if (!validateOnly) {
+ info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+ s"$proposedAssignmentForNewPartitions.")
+ // add the combined new list
+ createOrUpdateTopicPartitionAssignmentPathInZK(topic, proposedAssignment, update = true)
+ }
+ proposedAssignment
+
+ }
+
+ private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+ expectedReplicationFactor: Int,
+ availableBrokerIds: Set[Int]): Unit = {
+
+ replicaAssignment.foreach { case (partitionId, replicas) =>
+ if (replicas.isEmpty)
+ throw new InvalidReplicaAssignmentException(
+ s"Cannot have replication factor of 0 for partition id $partitionId.")
+ if (replicas.size != replicas.toSet.size)
+ throw new InvalidReplicaAssignmentException(
+ s"Duplicate brokers not allowed in replica assignment: " +
+ s"${replicas.mkString(", ")} for partition id $partitionId.")
+ if (!replicas.toSet.subsetOf(availableBrokerIds))
+ throw new BrokerNotAvailableException(
+ s"Some brokers specified for partition id $partitionId are not available. " +
+ s"Specified brokers: ${replicas.mkString(", ")}, " +
+ s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+ partitionId -> replicas.size
+ }
+ val badRepFactors = replicaAssignment.collect {
+ case (partition, replicas) if replicas.size != expectedReplicationFactor => partition -> replicas.size
+ }
+ if (badRepFactors.nonEmpty) {
+ val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+ val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+ val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
+ throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+ s"partition 0 has ${expectedReplicationFactor} while partitions [${partitions.mkString(", ")}] have " +
+ s"replication factors [${repFactors.mkString(", ")}], respectively.")
+ }
+ }
+
+ /**
+ * Change the configs for a given entityType and entityName
+ * @param entityType
+ * @param entityName
+ * @param configs
+ */
+ def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = {
+
+ def parseBroker(broker: String): Int = {
+ try broker.toInt
+ catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+ }
+ }
+
+ entityType match {
+ case ConfigType.Topic => changeTopicConfig(entityName, configs)
+ case ConfigType.Client => changeClientIdConfig(entityName, configs)
+ case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
+ case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
+ case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+ }
+ }
+
+ /**
+ * Update the config for a client and create a change notification so the change will propagate to other brokers.
+ * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+ * and <user> configs are not specified.
+ *
+ * @param sanitizedClientId: The sanitized clientId for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
+ DynamicConfig.Client.validate(configs)
+ changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
+ }
+
+ /**
+ * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+ * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+ * value to be applied if a more specific override is not configured.
+ *
+ * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
+ if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+ DynamicConfig.Client.validate(configs)
+ else
+ DynamicConfig.User.validate(configs)
+ changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
+ }
+
+ /**
+ * validates the topic configs
+ * @param topic
+ * @param configs
+ */
+ def validateTopicConfig(topic: String, configs: Properties): Unit = {
+ Topic.validate(topic)
+ if (!zkClient.topicExists(topic))
+ throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+ // remove the topic overrides
+ LogConfig.validate(configs)
+ }
+
+ /**
+ * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+ *
+ * @param topic: The topic for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeTopicConfig(topic: String, configs: Properties): Unit = {
+ validateTopicConfig(topic, configs)
+ changeEntityConfig(ConfigType.Topic, topic, configs)
+ }
+
+ /**
+ * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+ * override any defaults entered in the broker's config files
+ *
+ * @param brokers: The list of brokers to apply config changes to
+ * @param configs: The config to change, as properties
+ */
+ def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
+ DynamicConfig.Broker.validate(configs)
+ brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+ }
+ }
+
+ private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+ val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+ zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
+
+ // create the change notification
+ zkClient.createConfigChangeNotification(sanitizedEntityPath)
+ }
+
+ /**
+ * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+ * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
+ * @param rootEntityType
+ * @param sanitizedEntityName
+ * @return
+ */
+ def fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String): Properties = {
+ zkClient.getEntityConfigs(rootEntityType, sanitizedEntityName)
+ }
+
+ /**
+ * Gets all topic configs
+ * @return
+ */
+ def getAllTopicConfigs(): Map[String, Properties] =
+ zkClient.getAllTopicsInCluster.map(topic => (topic, fetchEntityConfig(ConfigType.Topic, topic))).toMap
+
+ /**
+ * Gets all the entity configs for a given entityType
+ * @param entityType
+ * @return
+ */
+ def fetchAllEntityConfigs(entityType: String): Map[String, Properties] =
+ zkClient.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(entityType, entity))).toMap
+
+ /**
+ * Gets all the entity configs for a given childEntityType
+ * @param rootEntityType
+ * @param childEntityType
+ * @return
+ */
+ def fetchAllChildEntityConfigs(rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+ def entityPaths(rootPath: Option[String]): Seq[String] = {
+ val root = rootPath match {
+ case Some(path) => rootEntityType + '/' + path
+ case None => rootEntityType
+ }
+ val entityNames = zkClient.getAllEntitiesWithConfig(root)
+ rootPath match {
+ case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+ case None => entityNames
+ }
+ }
+ entityPaths(None)
+ .flatMap(entity => entityPaths(Some(entity + '/' + childEntityType)))
+ .map(entityPath => (entityPath, fetchEntityConfig(rootEntityType, entityPath))).toMap
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 24d7ba9..b419654 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
import kafka.log.LogConfig
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -33,8 +33,8 @@ import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException}
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
/**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -168,7 +168,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
configResponse.resultCode match {
case Code.OK =>
val overrides = ConfigEntityZNode.decode(configResponse.data)
- val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+ val logConfig = LogConfig.fromProps(config, overrides)
logConfigs.put(topic, logConfig)
case Code.NONODE =>
val logConfig = LogConfig.fromProps(config, new Properties)
@@ -180,32 +180,100 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Get entity configs for a given entity name
+ * @param rootEntityType entity type
+ * @param sanitizedEntityName entity name
+ * @return The successfully gathered log configs
+ */
+ def getEntityConfigs(rootEntityType: String, sanitizedEntityName: String): Properties = {
+ val getDataRequest = GetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName))
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ ConfigEntityZNode.decode(getDataResponse.data)
+ case Code.NONODE => new Properties()
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Sets or creates the entity znode path with the given configs depending
+ * on whether it already exists or not.
+ * @param rootEntityType entity type
+ * @param sanitizedEntityName entity name
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties) = {
+
+ def set(configData: Array[Byte]): SetDataResponse = {
+ val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ def create(configData: Array[Byte]) = {
+ val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
+ createRecursive(path, ConfigEntityZNode.encode(config))
+ }
+
+ val configData = ConfigEntityZNode.encode(config)
+
+ val setDataResponse = set(configData)
+ setDataResponse.resultCode match {
+ case Code.NONODE => create(configData)
+ case _ => setDataResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
+ * Returns all the entities for a given entityType
+ * @param entityType entity type
+ * @return List of all entity names
+ */
+ def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
+ getChildren(ConfigEntityTypeZNode.path(entityType))
+ }
+
+ /**
+ * Creates config change notification
+ * @param sanitizedEntityPath sanitizedEntityPath path to write
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
+ val path = ConfigEntityChangeNotificationSequenceZNode.createPath
+ val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode != Code.OK) {
+ createResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
* Gets all brokers in the cluster.
* @return sequence of brokers in the cluster.
*/
def getAllBrokersInCluster: Seq[Broker] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
- getChildrenResponse.resultCode match {
- case Code.OK =>
- val brokerIds = getChildrenResponse.children.map(_.toInt)
- val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
- val getDataResponses = retryRequestsUntilConnected(getDataRequests)
- getDataResponses.flatMap { getDataResponse =>
- val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
- getDataResponse.resultCode match {
- case Code.OK =>
- Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
- case Code.NONODE => None
- case _ => throw getDataResponse.resultException.get
- }
- }
- case Code.NONODE =>
- Seq.empty
- case _ =>
- throw getChildrenResponse.resultException.get
+ val brokerIds = getSortedBrokerList
+ val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.flatMap { getDataResponse =>
+ val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
}
}
+
+ /**
+ * Gets the list of sorted broker Ids
+ */
+ def getSortedBrokerList(): Seq[Int] =
+ getChildren(BrokerIdsZNode.path).map(_.toInt).sorted
+
/**
* Gets all topics in the cluster.
* @return sequence of topics in the cluster.
@@ -221,6 +289,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Checks the topic existence
+ * @param topicName
+ * @return true if topic exists else false
+ */
+ def topicExists(topicName: String): Boolean = {
+ pathExists(TopicZNode.path(topicName))
+ }
+
+ /**
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
* @param assignment the partition to replica mapping to set for the given topic
@@ -232,6 +309,29 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Sets the topic znode with the given assignment.
+ * @param topic the topic whose assignment is being set.
+ * @param assignment the partition to replica mapping to set for the given topic
+ * @throws KeeperException if there is an error while setting assignment
+ */
+ def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+ val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+ if (setDataResponse.resultCode != Code.OK) {
+ setDataResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
+ * Create the topic znode with the given assignment.
+ * @param topic the topic whose assignment is being set.
+ * @param assignment the partition to replica mapping to set for the given topic
+ * @throws KeeperException if there is an error while creating assignment
+ */
+ def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+ createRecursive(TopicZNode.path(topic), TopicZNode.encode(assignment))
+ }
+
+ /**
* Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
*/
@@ -271,7 +371,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteLogDirEventNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -305,6 +405,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets partition the assignments for the given topics.
+ * @param topics the topics whose partitions we wish to get the assignments for.
+ * @return the partition assignment for each partition from the given topics.
+ */
+ def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
+ val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+ getDataResponses.flatMap { getDataResponse =>
+ val topic = getDataResponse.ctx.get.asInstanceOf[String]
+ if (getDataResponse.resultCode == Code.OK) {
+ val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
+ Map(topic -> partitionMap)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ Map.empty[String, Map[Int, Seq[Int]]]
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }.toMap
+ }
+
+ /**
+ * Gets the partition numbers for the given topics
+ * @param topics the topics whose partitions we wish to get.
+ * @return the partition array for each topic from the given topics.
+ */
+ def getPartitionsForTopics(topics: Set[String]): Map[String, Seq[Int]] = {
+ getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+ }
+ }
+
+ /**
* Gets the partition count for a given topic
* @param topic The topic to get partition count for.
* @return optional integer that is Some if the topic exists and None otherwise.
@@ -318,6 +452,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets the assigned replicas for a specific topic and partition
+ * @param topicPartition TopicAndPartition to get assigned replicas for .
+ * @return List of assigned replicas
+ */
+ def getReplicasForPartition(topicPartition: TopicPartition): Seq[Int] = {
+ val topicData = getReplicaAssignmentForTopics(Set(topicPartition.topic))
+ topicData.getOrElse(topicPartition, Seq.empty)
+ }
+
+ /**
* Gets the data and version at the given zk path
* @param path zk node path
* @return A tuple of 2 elements, where first element is zk node data as string
@@ -413,6 +557,25 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Creates the delete topic znode.
+ * @param topicName topic name
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createDeleteTopicPath(topicName: String): Unit = {
+ createRecursive(DeleteTopicsTopicZNode.path(topicName))
+ }
+
+
+ /**
+ * Checks if topic is marked for deletion
+ * @param topic
+ * @return true if topic is marked for deletion, else false
+ */
+ def isTopicMarkedForDeletion(topic: String): Boolean = {
+ pathExists(DeleteTopicsTopicZNode.path(topic))
+ }
+
+ /**
* Get all topics marked for deletion.
* @return sequence of topics marked for deletion.
*/
@@ -479,6 +642,21 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Creates the partition reassignment znode with the given reassignment.
+ * @param reassignment the reassignment to set on the reassignment znode.
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]]) = {
+ val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+ acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+ val createResponse = retryRequestUntilConnected(createRequest)
+
+ if (createResponse.resultCode != Code.OK) {
+ throw createResponse.resultException.get
+ }
+ }
+
+ /**
* Deletes the partition reassignment znode.
*/
def deletePartitionReassignment(): Unit = {
@@ -487,6 +665,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Checks if reassign partitions is in progress
+ * @return true if reassign partitions is in progress, else false
+ */
+ def reassignPartitionsInProgress(): Boolean = {
+ pathExists(ReassignPartitionsZNode.path)
+ }
+
+ /**
+ * Gets the partitions being reassigned for given topics
+ * @return ReassignedPartitionsContexts for each topic which are being reassigned.
+ */
+ def getPartitionsBeingReassigned(): Map[TopicPartition, ReassignedPartitionsContext] = {
+ getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+ }
+
+ /**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want to get states.
* @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
@@ -504,6 +698,35 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets topic partition state for the given partition.
+ * @param partition the partition for which we want to get state.
+ * @return LeaderIsrAndControllerEpoch of the partition state if exists, else None
+ */
+ def getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+ val getDataResponse = getTopicPartitionStatesRaw(Seq(partition)).head
+ if (getDataResponse.resultCode == Code.OK) {
+ TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Gets the leader for a given partition
+ * @param partition
+ * @return optional integer if the leader exists and None otherwise.
+ */
+ def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
+ val leaderIsrEpoch = getTopicPartitionState(partition)
+ if (leaderIsrEpoch.isDefined)
+ Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+ else
+ None
+ }
+
+ /**
* Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
*/
@@ -543,7 +766,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -641,9 +864,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Creates the required zk nodes for Acl storage
*/
def createAclPaths(): Unit = {
- createRecursive(AclZNode.path)
- createRecursive(AclChangeNotificationZNode.path)
- ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name)))
+ createRecursive(AclZNode.path, throwIfPathExists = false)
+ createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
+ ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = false))
}
/**
@@ -719,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteAclChangeNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -735,7 +958,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val deleteResponses = retryRequestsUntilConnected(deleteRequests)
deleteResponses.foreach { deleteResponse =>
if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
- throw deleteResponse.resultException.get
+ deleteResponse.resultException.foreach(e => throw e)
}
}
}
@@ -790,7 +1013,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
case _ => throw deleteResponse.resultException.get
}
}
-
+
/**
* Deletes the zk node recursively
* @param path
@@ -889,7 +1112,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
}
- /**
+ /**
* Set the committed offset for a topic partition and group
* @param group the group whose offset is being set
* @param topicPartition the topic partition whose offset is being set
@@ -898,8 +1121,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
val setDataResponse = setConsumerOffset(group, topicPartition, offset)
if (setDataResponse.resultCode == Code.NONODE) {
- val createResponse = createConsumerOffset(group, topicPartition, offset)
- createResponse.resultException.foreach(e => throw e)
+ createConsumerOffset(group, topicPartition, offset)
} else {
setDataResponse.resultException.foreach(e => throw e)
}
@@ -911,17 +1133,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
retryRequestUntilConnected(setDataRequest)
}
- private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): CreateResponse = {
+ private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long) = {
val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
- val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
- var createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode == Code.NONODE) {
- val indexOfLastSlash = path.lastIndexOf("/")
- if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
- createRecursive(path.substring(0, indexOfLastSlash))
- createResponse = retryRequestUntilConnected(createRequest)
- }
- createResponse
+ createRecursive(path, ConsumerOffset.encode(offset))
}
/**
@@ -955,21 +1169,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
}
- private[zk] def createRecursive(path: String): Unit = {
- val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
- var createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode == Code.NONODE) {
+ private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+
+ def parentPath(path: String): String = {
val indexOfLastSlash = path.lastIndexOf("/")
if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
- val parentPath = path.substring(0, indexOfLastSlash)
- createRecursive(parentPath)
- createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+ path.substring(0, indexOfLastSlash)
+ }
+
+ def createRecursive0(path: String): Unit = {
+ val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+ var createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode == Code.NONODE) {
+ createRecursive0(parentPath(path))
+ createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+ throw createResponse.resultException.get
+ }
+ } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
- } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
- throw createResponse.resultException.get
}
+
+ val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+ var createResponse = retryRequestUntilConnected(createRequest)
+
+ if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
+ createResponse.resultException.foreach(e => throw e)
+ } else if (createResponse.resultCode == Code.NONODE) {
+ createRecursive0(parentPath(path))
+ createResponse = retryRequestUntilConnected(createRequest)
+ createResponse.resultException.foreach(e => throw e)
+ } else if (createResponse.resultCode != Code.NODEEXISTS)
+ createResponse.resultException.foreach(e => throw e)
+
}
private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4c618a0..a0085cd 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,8 +28,6 @@ import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
-import scala.collection.Seq
-
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
object ControllerZNode {
@@ -140,16 +138,29 @@ object ConfigEntityZNode {
import scala.collection.JavaConverters._
Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
}
- def decode(bytes: Array[Byte]): Option[Properties] = {
- Json.parseBytes(bytes).map { js =>
- val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
- val props = new Properties()
- configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
- props
+ def decode(bytes: Array[Byte]): Properties = {
+ val props = new Properties()
+ if (bytes != null) {
+ Json.parseBytes(bytes).map { js =>
+ val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+ configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+ }
}
+ props
}
}
+object ConfigEntityChangeNotificationZNode {
+ def path = s"${ConfigZNode.path}/changes"
+}
+
+object ConfigEntityChangeNotificationSequenceZNode {
+ val SequenceNumberPrefix = "config_change_"
+ def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix"
+ def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version" -> 2, "entity_path" -> sanitizedEntityPath))
+ def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
object IsrChangeNotificationZNode {
def path = "/isr_change_notification"
}