You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:10 UTC
[5/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils
KAFKA-2639: Refactoring of ZkUtils
I've split the work of KAFKA-1695 because this refactoring touches a large number of files. Most of the changes are trivial, but I feel it will be easier to review this way.
This pull request includes the one Parth-Brahmbhatt started to address KAFKA-1695.
Author: flavio junqueira <fp...@apache.org>
Author: Flavio Junqueira <fp...@apache.org>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #303 from fpj/KAFKA-2639
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce306ba4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce306ba4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce306ba4
Branch: refs/heads/trunk
Commit: ce306ba4ebc77464bf8ff4d656e1f1f44979182e
Parents: 78a2e2f
Author: flavio junqueira <fp...@apache.org>
Authored: Sun Oct 18 15:23:52 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Oct 18 15:23:52 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
.../apache/kafka/common/security/JaasUtils.java | 64 +++
.../src/main/scala/kafka/admin/AdminUtils.scala | 129 +++---
.../main/scala/kafka/admin/ConfigCommand.scala | 26 +-
.../kafka/admin/ConsumerGroupCommand.scala | 74 ++--
.../PreferredReplicaLeaderElectionCommand.scala | 27 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 67 ++--
.../main/scala/kafka/admin/TopicCommand.scala | 65 +--
.../main/scala/kafka/client/ClientUtils.scala | 12 +-
.../main/scala/kafka/cluster/Partition.scala | 6 +-
.../ZkNodeChangeNotificationListener.scala | 14 +-
.../kafka/consumer/ConsumerFetcherManager.scala | 6 +-
.../kafka/consumer/PartitionAssignor.scala | 10 +-
.../main/scala/kafka/consumer/TopicCount.scala | 14 +-
.../consumer/ZookeeperConsumerConnector.scala | 58 +--
.../consumer/ZookeeperTopicEventWatcher.scala | 20 +-
.../kafka/controller/KafkaController.scala | 89 ++---
.../controller/PartitionLeaderSelector.scala | 2 +-
.../controller/PartitionStateMachine.scala | 39 +-
.../kafka/controller/ReplicaStateMachine.scala | 12 +-
.../kafka/controller/TopicDeletionManager.scala | 12 +-
.../kafka/coordinator/ConsumerCoordinator.scala | 16 +-
.../kafka/coordinator/CoordinatorMetadata.scala | 10 +-
.../security/auth/SimpleAclAuthorizer.scala | 42 +-
.../kafka/server/DynamicConfigManager.scala | 16 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 10 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 7 +-
.../main/scala/kafka/server/KafkaServer.scala | 58 +--
.../main/scala/kafka/server/OffsetManager.scala | 4 +-
.../kafka/server/ReplicaFetcherThread.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../kafka/server/ZookeeperLeaderElector.scala | 12 +-
.../kafka/tools/ConsumerOffsetChecker.scala | 40 +-
.../scala/kafka/tools/ExportZkOffsets.scala | 24 +-
.../scala/kafka/tools/ImportZkOffsets.scala | 9 +-
.../scala/kafka/tools/UpdateOffsetsInZK.scala | 19 +-
.../kafka/tools/VerifyConsumerRebalance.scala | 25 +-
.../scala/kafka/utils/ReplicationUtils.scala | 25 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 399 +++++++++++--------
.../kafka/api/ConsumerBounceTest.scala | 2 +-
.../integration/kafka/api/ConsumerTest.scala | 26 +-
.../kafka/api/IntegrationTestHarness.scala | 2 +-
.../kafka/api/ProducerBounceTest.scala | 4 +-
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 14 +-
.../kafka/api/ProducerSendTest.scala | 14 +-
.../integration/kafka/api/QuotasTest.scala | 2 +-
.../integration/kafka/api/SSLConsumerTest.scala | 6 +-
.../kafka/api/SSLProducerSendTest.scala | 6 +-
.../test/scala/other/kafka/DeleteZKPath.scala | 4 +-
.../scala/other/kafka/TestOffsetManager.scala | 24 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 34 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 107 +++--
.../kafka/admin/DeleteConsumerGroupTest.scala | 48 +--
.../unit/kafka/admin/DeleteTopicTest.scala | 85 ++--
.../unit/kafka/admin/TopicCommandTest.scala | 36 +-
.../ZkNodeChangeNotificationListenerTest.scala | 6 +-
.../kafka/consumer/ConsumerIteratorTest.scala | 2 +-
.../kafka/consumer/PartitionAssignorTest.scala | 23 +-
.../ZookeeperConsumerConnectorTest.scala | 38 +-
.../controller/ControllerFailoverTest.scala | 2 +-
.../coordinator/CoordinatorMetadataTest.scala | 52 +--
.../kafka/integration/AutoOffsetResetTest.scala | 2 +-
.../integration/BaseTopicMetadataTest.scala | 14 +-
.../unit/kafka/integration/FetcherTest.scala | 4 +-
.../kafka/integration/PrimitiveApiTest.scala | 8 +-
.../kafka/integration/RollingBounceTest.scala | 10 +-
.../integration/UncleanLeaderElectionTest.scala | 22 +-
.../ZookeeperConsumerConnectorTest.scala | 2 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 8 +-
.../unit/kafka/producer/ProducerTest.scala | 18 +-
.../unit/kafka/producer/SyncProducerTest.scala | 18 +-
.../security/auth/SimpleAclAuthorizerTest.scala | 6 +-
.../unit/kafka/server/AdvertiseBrokerTest.scala | 2 +-
.../kafka/server/BaseReplicaFetchTest.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 10 +-
.../server/HighwatermarkPersistenceTest.scala | 18 +-
.../unit/kafka/server/KafkaConfigTest.scala | 2 +-
.../unit/kafka/server/LeaderElectionTest.scala | 18 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 8 +-
.../unit/kafka/server/LogRecoveryTest.scala | 14 +-
.../unit/kafka/server/OffsetCommitTest.scala | 16 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 11 +-
.../unit/kafka/server/ServerShutdownTest.scala | 2 +-
.../unit/kafka/server/ServerStartupTest.scala | 6 +-
.../unit/kafka/server/SimpleFetchTest.scala | 6 +-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 16 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 57 +--
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 26 +-
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 48 +--
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 10 +-
91 files changed, 1283 insertions(+), 1109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 247f556..289f1d0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -31,6 +31,7 @@
<allow pkg="org.powermock" />
<allow pkg="javax.net.ssl" />
+ <allow pkg="javax.security.auth" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
new file mode 100644
index 0000000..ce0be62
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security;
+
+import java.io.File;
+import java.net.URI;
+import java.security.URIParameter;
+import javax.security.auth.login.Configuration;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JaasUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
+ public static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
+ public static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
+ public static final String SERVICE_NAME = "serviceName";
+ public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
+ public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client";
+ public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
+
+ public static boolean isZkSecurityEnabled(String loginConfigFile) {
+ boolean isSecurityEnabled = false;
+ boolean zkSaslEnabled = Boolean.getBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
+ String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
+
+ if (loginConfigFile != null && loginConfigFile.length() > 0) {
+ File configFile = new File(loginConfigFile);
+ if (!configFile.canRead()) {
+ throw new KafkaException("File " + loginConfigFile + "cannot be read.");
+ }
+ try {
+ URI configUri = configFile.toURI();
+ Configuration loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+ isSecurityEnabled = loginConf.getAppConfigurationEntry(zkLoginContextName) != null;
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ if (isSecurityEnabled && !zkSaslEnabled) {
+ LOG.error("JAAS file is present, but system property " +
+ ZK_SASL_CLIENT + " is set to false, which disables " +
+ "SASL in the ZooKeeper client");
+ throw new KafkaException("Exception while determining if ZooKeeper is secure");
+ }
+ }
+
+ return isSecurityEnabled;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 9966660..ecc5b9d 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -23,6 +23,7 @@ import kafka.cluster.{BrokerEndPoint, Broker}
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
+import kafka.utils.ZkUtils._
import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
@@ -103,12 +104,12 @@ object AdminUtils extends Logging {
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
*/
- def addPartitions(zkClient: ZkClient,
+ def addPartitions(zkUtils: ZkUtils,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true) {
- val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+ val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -118,7 +119,7 @@ object AdminUtils extends Logging {
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
- val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ val brokerList = zkUtils.getSortedBrokerList()
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
else
@@ -134,7 +135,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
@@ -159,9 +160,9 @@ object AdminUtils extends Logging {
ret.toMap
}
- def deleteTopic(zkClient: ZkClient, topic: String) {
+ def deleteTopic(zkUtils: ZkUtils, topic: String) {
try {
- ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+ zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
@@ -169,8 +170,8 @@ object AdminUtils extends Logging {
}
}
- def isConsumerGroupActive(zkClient: ZkClient, group: String) = {
- ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty
+ def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
+ zkUtils.getConsumersInGroup(group).nonEmpty
}
/**
@@ -180,10 +181,10 @@ object AdminUtils extends Logging {
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
- def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = {
- if (!isConsumerGroupActive(zkClient, group)) {
+ def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
+ if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupDirs(group)
- ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir)
+ zkUtils.deletePathRecursive(dir.consumerGroupDir)
true
}
else false
@@ -198,15 +199,15 @@ object AdminUtils extends Logging {
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
*/
- def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = {
- val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+ def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
+ val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics == Seq(topic)) {
- deleteConsumerGroupInZK(zkClient, group)
+ deleteConsumerGroupInZK(zkUtils, group)
}
- else if (!isConsumerGroupActive(zkClient, group)) {
+ else if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupTopicDirs(group, topic)
- ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir)
- ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir)
+ zkUtils.deletePathRecursive(dir.consumerOwnerDir)
+ zkUtils.deletePathRecursive(dir.consumerOffsetDir)
true
}
else false
@@ -218,25 +219,25 @@ object AdminUtils extends Logging {
* @param zkClient Zookeeper client
* @param topic Topic of the consumer group information we wish to delete
*/
- def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) {
- val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic)
- groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+ def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
+ val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
+ groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
}
- def topicExists(zkClient: ZkClient, topic: String): Boolean =
- zkClient.exists(ZkUtils.getTopicPath(topic))
+ def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
+ zkUtils.zkClient.exists(getTopicPath(topic))
- def createTopic(zkClient: ZkClient,
+ def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {
- val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ val brokerList = zkUtils.getSortedBrokerList()
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
- def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
+ def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
@@ -245,13 +246,13 @@ object AdminUtils extends Logging {
Topic.validate(topic)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
- val topicPath = ZkUtils.getTopicPath(topic)
+ val topicPath = getTopicPath(topic)
if (!update) {
- if (zkClient.exists(topicPath))
+ if (zkUtils.zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
else if (Topic.hasCollisionChars(topic)) {
- val allTopics = ZkUtils.getAllTopics(zkClient)
+ val allTopics = zkUtils.getAllTopics()
val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
if (collidingTopics.nonEmpty) {
throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
@@ -265,24 +266,24 @@ object AdminUtils extends Logging {
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
LogConfig.validate(config)
- writeEntityConfig(zkClient, ConfigType.Topic, topic, config)
+ writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
}
// create the partition assignment
- writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
+ writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
- private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+ private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
- val zkPath = ZkUtils.getTopicPath(topic)
- val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
+ val zkPath = getTopicPath(topic)
+ val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
- ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+ zkUtils.createPersistentPath(zkPath, jsonPartitionData)
} else {
info("Topic update " + jsonPartitionData.toString)
- ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)
+ zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
@@ -299,8 +300,8 @@ object AdminUtils extends Logging {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
- def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) {
- changeEntityConfig(zkClient, ConfigType.Client, clientId, configs)
+ def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) {
+ changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
}
/**
@@ -311,22 +312,22 @@ object AdminUtils extends Logging {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
- def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
- if(!topicExists(zkClient, topic))
+ def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
+ if(!topicExists(zkUtils, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
- changeEntityConfig(zkClient, ConfigType.Topic, topic, configs)
+ changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
}
- private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) {
+ private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
// write the new config--may not exist if there were previously no overrides
- writeEntityConfig(zkClient, entityType, entityName, configs)
+ writeEntityConfig(zkUtils, entityType, entityName, configs)
// create the change notification
val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
- zkClient.createPersistentSequential(seqNode, content)
+ zkUtils.zkClient.createPersistentSequential(seqNode, content)
}
def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
@@ -336,20 +337,20 @@ object AdminUtils extends Logging {
/**
* Write out the topic config to zk, if there is any
*/
- private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) {
+ private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
- ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map))
+ zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
}
/**
* Read the entity (topic or client) config (if any) from zk
*/
- def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = {
- val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true)
+ def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
+ val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
@@ -375,28 +376,28 @@ object AdminUtils extends Logging {
props
}
- def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
- ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap
+ def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
+ zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
- def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
- fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
+ def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
+ fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
- def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
+ def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
- topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
+ topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
}
- private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
- if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
- val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
+ private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
+ if(zkUtils.pathExists(getTopicPath(topic))) {
+ val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1
val replicas = partitionMap._2
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition)
+ val leader = zkUtils.getLeaderForPartition(topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
var leaderInfo: Option[BrokerEndPoint] = None
@@ -406,15 +407,15 @@ object AdminUtils extends Logging {
leaderInfo = leader match {
case Some(l) =>
try {
- Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
+ Some(getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
} catch {
case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
- replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
- isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
+ replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
+ isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
} catch {
case e: Throwable => throw new ReplicaNotAvailableException(e)
}
@@ -439,7 +440,7 @@ object AdminUtils extends Logging {
}
}
- private def getBrokerInfoFromCache(zkClient: ZkClient,
+ private def getBrokerInfoFromCache(zkUtils: ZkUtils,
cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
brokerIds: Seq[Int]): Seq[Broker] = {
var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
@@ -448,7 +449,7 @@ object AdminUtils extends Logging {
optionalBrokerInfo match {
case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
case None => // fetch it from zookeeper
- ZkUtils.getBrokerInfo(zkClient, id) match {
+ zkUtils.getBrokerInfo(id) match {
case Some(brokerInfo) =>
cachedBrokerInfo += (id -> brokerInfo)
Some(brokerInfo)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f0217de..ba4c003 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -26,6 +26,7 @@ import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.collection.JavaConversions._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
/**
@@ -42,52 +43,55 @@ object ConfigCommand {
opts.checkArgs()
- val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+ val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
try {
if (opts.options.has(opts.alterOpt))
- alterConfig(zkClient, opts)
+ alterConfig(zkUtils, opts)
else if (opts.options.has(opts.describeOpt))
- describeConfig(zkClient, opts)
+ describeConfig(zkUtils, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
- zkClient.close()
+ zkUtils.close()
}
}
- private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+ private def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entityType = opts.options.valueOf(opts.entityType)
val entityName = opts.options.valueOf(opts.entityName)
// compile the final set of configs
- val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+ val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
if (entityType.equals(ConfigType.Topic)) {
- AdminUtils.changeTopicConfig(zkClient, entityName, configs)
+ AdminUtils.changeTopicConfig(zkUtils, entityName, configs)
println("Updated config for topic: \"%s\".".format(entityName))
} else {
- AdminUtils.changeClientIdConfig(zkClient, entityName, configs)
+ AdminUtils.changeClientIdConfig(zkUtils, entityName, configs)
println("Updated config for clientId: \"%s\".".format(entityName))
}
}
- private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+ private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
val entityType = opts.options.valueOf(opts.entityType)
val entityNames: Seq[String] =
if (opts.options.has(opts.entityName))
Seq(opts.options.valueOf(opts.entityName))
else
- ZkUtils.getAllEntitiesWithConfig(zkClient, entityType)
+ zkUtils.getAllEntitiesWithConfig(entityType)
for (entityName <- entityNames) {
- val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+ val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
println("Configs for %s:%s are %s"
.format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index f23120e..8efbb2a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -31,6 +31,7 @@ import scala.collection.{Set, mutable}
import kafka.consumer.SimpleConsumer
import collection.JavaConversions._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
object ConsumerGroupCommand {
@@ -48,57 +49,60 @@ object ConsumerGroupCommand {
opts.checkArgs()
- val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+ val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
try {
if (opts.options.has(opts.listOpt))
- list(zkClient)
+ list(zkUtils)
else if (opts.options.has(opts.describeOpt))
- describe(zkClient, opts)
+ describe(zkUtils, opts)
else if (opts.options.has(opts.deleteOpt))
- delete(zkClient, opts)
+ delete(zkUtils, opts)
} catch {
case e: Throwable =>
println("Error while executing consumer group command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
- zkClient.close()
+ zkUtils.close()
}
}
- def list(zkClient: ZkClient) {
- ZkUtils.getConsumerGroups(zkClient).foreach(println)
+ def list(zkUtils: ZkUtils) {
+ zkUtils.getConsumerGroups().foreach(println)
}
- def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val configs = parseConfigs(opts)
val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
val group = opts.options.valueOf(opts.groupOpt)
- val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+ val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty) {
println("No topic available for consumer group provided")
}
- topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+ topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
}
- def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
- deleteForTopic(zkClient, opts)
+ deleteForTopic(zkUtils, opts)
}
else if (opts.options.has(opts.groupOpt)) {
- deleteForGroup(zkClient, opts)
+ deleteForGroup(zkUtils, opts)
}
else if (opts.options.has(opts.topicOpt)) {
- deleteAllForTopic(zkClient, opts)
+ deleteAllForTopic(zkUtils, opts)
}
}
- private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ private def deleteForGroup(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val groups = opts.options.valuesOf(opts.groupOpt)
groups.foreach { group =>
try {
- if (AdminUtils.deleteConsumerGroupInZK(zkClient, group))
+ if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
println("Deleted all consumer group information for group %s in zookeeper.".format(group))
else
println("Delete for group %s failed because its consumers are still active.".format(group))
@@ -110,13 +114,13 @@ object ConsumerGroupCommand {
}
}
- private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ private def deleteForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val groups = opts.options.valuesOf(opts.groupOpt)
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
groups.foreach { group =>
try {
- if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+ if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
else
println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
@@ -128,10 +132,10 @@ object ConsumerGroupCommand {
}
}
- private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+ private def deleteAllForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
- AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+ AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
}
@@ -144,35 +148,35 @@ object ConsumerGroupCommand {
props
}
- private def describeTopic(zkClient: ZkClient,
+ private def describeTopic(zkUtils: ZkUtils,
group: String,
topic: String,
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int) {
- val topicPartitions = getTopicPartitions(zkClient, topic)
- val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+ val topicPartitions = getTopicPartitions(zkUtils, topic)
+ val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
println("%s, %s, %s, %s, %s, %s, %s"
.format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
topicPartitions
.sortBy { case topicPartition => topicPartition.partition }
.foreach { topicPartition =>
- describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
+ describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
}
}
- private def getTopicPartitions(zkClient: ZkClient, topic: String) = {
- val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+ private def getTopicPartitions(zkUtils: ZkUtils, topic: String) = {
+ val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
partitions.map(TopicAndPartition(topic, _))
}
- private def getPartitionOffsets(zkClient: ZkClient,
+ private def getPartitionOffsets(zkUtils: ZkUtils,
group: String,
topicPartitions: Seq[TopicAndPartition],
channelSocketTimeoutMs: Int,
channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
val offsetMap = mutable.Map[TopicAndPartition, Long]()
- val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+ val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
@@ -182,7 +186,7 @@ object ConsumerGroupCommand {
// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
try {
- val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+ val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
offsetMap.put(topicAndPartition, offset)
} catch {
case z: ZkNoNodeException =>
@@ -200,20 +204,20 @@ object ConsumerGroupCommand {
offsetMap.toMap
}
- private def describePartition(zkClient: ZkClient,
+ private def describePartition(zkUtils: ZkUtils,
group: String,
topic: String,
partition: Int,
offsetOpt: Option[Long]) {
val topicAndPartition = TopicAndPartition(topic, partition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
- val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1
- ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
+ val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
+ zkUtils.getLeaderForPartition(topic, partition) match {
case Some(-1) =>
println("%s, %s, %s, %s, %s, %s, %s"
.format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
case Some(brokerId) =>
- val consumerOpt = getConsumer(zkClient, brokerId)
+ val consumerOpt = getConsumer(zkUtils, brokerId)
consumerOpt match {
case Some(consumer) =>
val request =
@@ -231,9 +235,9 @@ object ConsumerGroupCommand {
}
}
- private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = {
+ private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
try {
- ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+ zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) =>
Json.parseFull(brokerInfoString) match {
case Some(m) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 2aa6e62..e74fcb6 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
import collection._
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
object PreferredReplicaLeaderElectionCommand extends Logging {
@@ -51,15 +52,19 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
var zkClient: ZkClient = null
-
+ var zkUtils: ZkUtils = null
try {
zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+ zkUtils = ZkUtils(zkConnect,
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
val partitionsForPreferredReplicaElection =
if (!options.has(jsonFileOpt))
- ZkUtils.getAllPartitions(zkClient)
+ zkUtils.getAllPartitions()
else
parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
- val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
+ val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
@@ -95,18 +100,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
- def writePreferredReplicaElectionData(zkClient: ZkClient,
+ def writePreferredReplicaElectionData(zkUtils: ZkUtils,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
try {
- ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+ zkUtils.createPersistentPath(zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
} catch {
case nee: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+ PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
"%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -114,20 +119,20 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
-class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scala.collection.Set[TopicAndPartition])
+class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: scala.collection.Set[TopicAndPartition])
extends Logging {
def moveLeaderToPreferredReplica() = {
try {
- val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
- PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
+ val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition))
+ PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
} catch {
case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
}
}
- def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+ def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
// check if partition exists
- val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+ val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
partitionsOpt match {
case Some(partitions) =>
if(partitions.contains(partition)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index ea34589..10182f6 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
object ReassignPartitionsCommand extends Logging {
@@ -38,33 +39,37 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
- var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+ val zkUtils = ZkUtils(zkConnect,
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
try {
if(opts.options.has(opts.verifyOpt))
- verifyAssignment(zkClient, opts)
+ verifyAssignment(zkUtils, opts)
else if(opts.options.has(opts.generateOpt))
- generateAssignment(zkClient, opts)
+ generateAssignment(zkUtils, opts)
else if (opts.options.has(opts.executeOpt))
- executeAssignment(zkClient, opts)
+ executeAssignment(zkUtils, opts)
} catch {
case e: Throwable =>
println("Partitions reassignment failed due to " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
+ val zkClient = zkUtils.zkClient
if (zkClient != null)
zkClient.close()
}
}
- def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
- val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+ val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString)
println("Status of partition reassignment:")
- val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+ val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
reassignedPartitionsStatus.foreach { partition =>
partition._2 match {
case ReassignmentCompleted =>
@@ -77,7 +82,7 @@ object ReassignPartitionsCommand extends Logging {
}
}
- def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
@@ -86,11 +91,11 @@ object ReassignPartitionsCommand extends Logging {
if (duplicateReassignments.nonEmpty)
throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
- val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+ val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
- val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+ val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
@@ -99,18 +104,18 @@ object ReassignPartitionsCommand extends Logging {
topicInfo._2.head._2.size)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
}
- val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+ val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq)
println("Current partition replica assignment\n\n%s"
- .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
- println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+ .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+ println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
}
- def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+ def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
if(!opts.options.has(opts.reassignmentJsonFileOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
- val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+ val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
@@ -125,28 +130,28 @@ object ReassignPartitionsCommand extends Logging {
.mkString(". ")
throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
}
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
// before starting assignment, output the current replica assignment to facilitate rollback
- val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic))
+ val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
- .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+ .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
// start the reassignment
if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
+ println("Successfully started reassignment of partitions %s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
- private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
+ private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
:Map[TopicAndPartition, ReassignmentStatus] = {
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
partitionsToBeReassigned.map { topicAndPartition =>
- (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
+ (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
}
}
- def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
+ def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
reassignedReplicas: Seq[Int],
partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
@@ -155,7 +160,7 @@ object ReassignPartitionsCommand extends Logging {
case Some(partition) => ReassignmentInProgress
case None =>
// check if the current replica assignment matches the expected one after reassignment
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
+ val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
if(assignedReplicas == newReplicas)
ReassignmentCompleted
else {
@@ -203,31 +208,31 @@ object ReassignPartitionsCommand extends Logging {
}
}
-class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
+class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
extends Logging {
def reassignPartitions(): Boolean = {
try {
- val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
+ val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
if(validPartitions.isEmpty) {
false
}
else {
- val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
- ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+ val jsonReassignmentData = zkUtils.getPartitionReassignmentZkData(validPartitions)
+ zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
}
} catch {
case ze: ZkNodeExistsException =>
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
case e: Throwable => error("Admin command failed", e); false
}
}
- def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+ def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
// check if partition exists
- val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+ val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
partitionsOpt match {
case Some(partitions) =>
if(partitions.contains(partition)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3abac62..9fe2606 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import kafka.common.{Topic, AdminCommandFailedException}
import kafka.utils.CommandLineUtils
import kafka.utils._
+import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
@@ -30,6 +31,7 @@ import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.{ConfigType, OffsetManager}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
import kafka.coordinator.ConsumerCoordinator
@@ -49,33 +51,36 @@ object TopicCommand extends Logging {
opts.checkArgs()
- val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+ val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
+ 30000,
+ 30000,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
- createTopic(zkClient, opts)
+ createTopic(zkUtils, opts)
else if(opts.options.has(opts.alterOpt))
- alterTopic(zkClient, opts)
+ alterTopic(zkUtils, opts)
else if(opts.options.has(opts.listOpt))
- listTopics(zkClient, opts)
+ listTopics(zkUtils, opts)
else if(opts.options.has(opts.describeOpt))
- describeTopic(zkClient, opts)
+ describeTopic(zkUtils, opts)
else if(opts.options.has(opts.deleteOpt))
- deleteTopic(zkClient, opts)
+ deleteTopic(zkUtils, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
- zkClient.close()
+ zkUtils.close()
System.exit(exitCode)
}
}
- private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
- val allTopics = ZkUtils.getAllTopics(zkClient).sorted
+ private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
+ val allTopics = zkUtils.getAllTopics().sorted
if (opts.options.has(opts.topicOpt)) {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
@@ -84,31 +89,31 @@ object TopicCommand extends Logging {
allTopics
}
- def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+ def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, update = false)
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
- AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+ AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
}
println("Created topic \"%s\".".format(topic))
}
- def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
- val topics = getTopics(zkClient, opts)
+ def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ val topics = getTopics(zkUtils, opts)
if (topics.length == 0) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
topics.foreach { topic =>
- val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+ val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
println(" Going forward, please use kafka-configs.sh for this functionality")
@@ -118,7 +123,7 @@ object TopicCommand extends Logging {
// compile the final set of configs
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))
- AdminUtils.changeTopicConfig(zkClient, topic, configs)
+ AdminUtils.changeTopicConfig(zkUtils, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
@@ -130,16 +135,16 @@ object TopicCommand extends Logging {
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
- AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+ AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
println("Adding partitions succeeded!")
}
}
}
- def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
- val topics = getTopics(zkClient, opts)
+ def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ val topics = getTopics(zkUtils, opts)
for(topic <- topics) {
- if (ZkUtils.pathExists(zkClient,ZkUtils.getDeleteTopicPath(topic))) {
+ if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
println("%s - marked for deletion".format(topic))
} else {
println(topic)
@@ -147,8 +152,8 @@ object TopicCommand extends Logging {
}
}
- def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
- val topics = getTopics(zkClient, opts)
+ def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ val topics = getTopics(zkUtils, opts)
if (topics.length == 0) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
@@ -158,7 +163,7 @@ object TopicCommand extends Logging {
if (Topic.InternalTopics.contains(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
- ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+ zkUtils.createPersistentPath(getDeleteTopicPath(topic))
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
@@ -173,20 +178,20 @@ object TopicCommand extends Logging {
}
}
- def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
- val topics = getTopics(zkClient, opts)
+ def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ val topics = getTopics(zkUtils, opts)
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
- val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
+ val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
for (topic <- topics) {
- ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
+ zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (describeConfigs) {
- val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+ val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
if (!reportOverriddenConfigs || configs.size() != 0) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
@@ -196,8 +201,8 @@ object TopicCommand extends Logging {
}
if (describePartitions) {
for ((partitionId, assignedReplicas) <- sortedPartitions) {
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+ val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
+ val leader = zkUtils.getLeaderForPartition(topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 68c7e7f..6ae0347 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -27,7 +27,7 @@ import kafka.utils.{CoreUtils, Logging}
import java.util.Properties
import util.Random
import kafka.network.BlockingChannel
-import kafka.utils.ZkUtils._
+import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import java.io.IOException
@@ -108,11 +108,11 @@ object ClientUtils extends Logging{
/**
* Creates a blocking channel to a random broker
*/
- def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
+ def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : BlockingChannel = {
var channel: BlockingChannel = null
var connected = false
while (!connected) {
- val allBrokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
+ val allBrokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
Random.shuffle(allBrokers).find { broker =>
trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
try {
@@ -137,8 +137,8 @@ object ClientUtils extends Logging{
/**
* Creates a blocking channel to the offset manager of the given group
*/
- def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
- var queryChannel = channelToAnyBroker(zkClient)
+ def channelToOffsetManager(group: String, zkUtils: ZkUtils, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
+ var queryChannel = channelToAnyBroker(zkUtils)
var offsetManagerChannelOpt: Option[BlockingChannel] = None
@@ -149,7 +149,7 @@ object ClientUtils extends Logging{
while (!coordinatorOpt.isDefined) {
try {
if (!queryChannel.isConnected)
- queryChannel = channelToAnyBroker(zkClient)
+ queryChannel = channelToAnyBroker(zkUtils)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
queryChannel.send(ConsumerMetadataRequest(group))
val response = queryChannel.receive()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ee332ed..59d025b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -42,7 +42,7 @@ class Partition(val topic: String,
replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
- private val zkClient = replicaManager.zkClient
+ private val zkUtils = replicaManager.zkUtils
private val assignedReplicaMap = new Pool[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
@@ -87,7 +87,7 @@ class Partition(val topic: String,
case None =>
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
- AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic))
+ AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
@@ -426,7 +426,7 @@ class Partition(val topic: String,
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
- val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
+ val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index eb44c31..a600d5d 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -44,7 +44,7 @@ trait NotificationHandler {
* @param changeExpirationMs
* @param time
*/
-class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
+class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
@@ -56,8 +56,8 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
* create seqNodeRoot and begin watching for any new children nodes.
*/
def init() {
- ZkUtils.makeSurePersistentPathExists(zkClient, seqNodeRoot)
- zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+ zkUtils.makeSurePersistentPathExists(seqNodeRoot)
+ zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
processAllNotifications()
}
@@ -65,7 +65,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
* Process all changes
*/
def processAllNotifications() {
- val changes = zkClient.getChildren(seqNodeRoot)
+ val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
processNotifications(changes.asScala.sorted)
}
@@ -80,7 +80,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
- val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+ val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read null data from $changeZnode when processing notification $notification"))
}
lastExecutedChange = changeId
@@ -97,11 +97,11 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
for (notification <- notifications.sorted) {
val notificationNode = seqNodeRoot + "/" + notification
- val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, notificationNode)
+ val (data, stat) = zkUtils.readDataMaybeNull(notificationNode)
if (data.isDefined) {
if (now - stat.getCtime > changeExpirationMs) {
debug(s"Purging change notification $notificationNode")
- ZkUtils.deletePath(zkClient, notificationNode)
+ zkUtils.deletePath(notificationNode)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 49b683f..e73faf2 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -26,7 +26,7 @@ import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.CoreUtils.inLock
-import kafka.utils.ZkUtils._
+import kafka.utils.ZkUtils
import kafka.utils.{ShutdownableThread, SystemTime}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger
*/
class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig,
- private val zkClient : ZkClient)
+ private val zkUtils : ZkUtils)
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
@@ -62,7 +62,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
trace("Partitions without leader %s".format(noLeaderPartitionSet))
- val brokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
+ val brokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers,
config.clientId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 849284a..5a1bdd0 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -41,19 +41,19 @@ object PartitionAssignor {
}
}
-class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
+class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkUtils: ZkUtils) {
val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
- val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
+ val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkUtils, excludeInternalTopics)
myTopicCount.getConsumerThreadIdsPerTopic
}
val partitionsForTopic: collection.Map[String, Seq[Int]] =
- ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
+ zkUtils.getPartitionsForTopics(myTopicThreadIds.keySet.toSeq)
val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
- ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
+ zkUtils.getConsumersPerTopic(group, excludeInternalTopics)
- val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
+ val consumers: Seq[String] = zkUtils.getConsumersInGroup(group).sorted
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 6994c8e..5706d3c 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -56,9 +56,9 @@ private[kafka] object TopicCount extends Logging {
consumerThreadIdsPerTopicMap
}
- def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
+ def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = {
val dirs = new ZKGroupDirs(group)
- val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
+ val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1
var subscriptionPattern: String = null
var topMap: Map[String, Int] = null
try {
@@ -94,15 +94,15 @@ private[kafka] object TopicCount extends Logging {
new Whitelist(regex)
else
new Blacklist(regex)
- new WildcardTopicCount(zkClient, consumerId, filter, numStreams, excludeInternalTopics)
+ new WildcardTopicCount(zkUtils, consumerId, filter, numStreams, excludeInternalTopics)
}
}
def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
new StaticTopicCount(consumerIdString, topicCount)
- def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient, excludeInternalTopics: Boolean) =
- new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams, excludeInternalTopics)
+ def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkUtils: ZkUtils, excludeInternalTopics: Boolean) =
+ new WildcardTopicCount(zkUtils, consumerIdString, filter, numStreams, excludeInternalTopics)
}
@@ -125,13 +125,13 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
def pattern = TopicCount.staticPattern
}
-private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
consumerIdString: String,
topicFilter: TopicFilter,
numStreams: Int,
excludeInternalTopics: Boolean) extends TopicCount {
def getConsumerThreadIdsPerTopic = {
- val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+ val wildcardTopics = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath)
.filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}