You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/26 01:38:44 UTC
kafka git commit: MINOR: Move a few methods from the `ZKUtils` class
to the companion object
Repository: kafka
Updated Branches:
refs/heads/trunk 53651937f -> 32feed25a
MINOR: Move a few methods from the `ZKUtils` class to the companion object
They don't require access to `ZkClient`.
Also include a few obvious clean-ups in `ZKUtils`:
* Remove redundant rethrows and braces
* Use named arguments for booleans
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Gwen Shapira <cs...@gmail.com>
Closes #1775 from ijuma/move-some-zk-utils-methods-to-companion-object
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32feed25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32feed25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32feed25
Branch: refs/heads/trunk
Commit: 32feed25aefdda3a9f2b780d0709a3002777c9df
Parents: 5365193
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Aug 26 00:41:23 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 26 00:41:23 2016 +0100
----------------------------------------------------------------------
.../kafka/admin/ReassignPartitionsCommand.scala | 16 +--
.../kafka/controller/KafkaController.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 132 +++++++++----------
.../admin/ReassignPartitionsClusterTest.scala | 9 +-
4 files changed, 77 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 18f741e..0b32d93 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -62,7 +62,7 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
- val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
println("Status of partition reassignment:")
val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
@@ -89,12 +89,12 @@ object ReassignPartitionsCommand extends Logging {
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
val disableRackAware = opts.options.has(opts.disableRackAware)
val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
- println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments)))
- println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments)))
+ println("Current partition replica assignment\n\n%s".format(ZkUtils.formatAsReassignmentJson(currentAssignments)))
+ println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments)))
}
def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
- val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
+ val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
@@ -126,7 +126,7 @@ object ReassignPartitionsCommand extends Logging {
def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String) {
- val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file is empty")
val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@@ -145,10 +145,10 @@ object ReassignPartitionsCommand extends Logging {
// before starting assignment, output the current replica assignment to facilitate rollback
val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
- .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
+ .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
// start the reassignment
if (reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
+ println("Successfully started reassignment of partitions %s".format(ZkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
@@ -226,7 +226,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top
val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
if (validPartitions.isEmpty) false
else {
- val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions)
+ val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0d6f048..04bd3f4 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1259,7 +1259,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
def handleDataChange(dataPath: String, data: Object) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
- val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
+ val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 3788ef4..a137da8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -120,6 +120,61 @@ object ZkUtils {
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
+
+ // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
+ def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
+ Json.parseFull(jsonData) match {
+ case Some(m) =>
+ m.asInstanceOf[Map[String, Any]].get("partitions") match {
+ case Some(partitionsSeq) =>
+ partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ val partition = p.get("partition").get.asInstanceOf[Int]
+ val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
+ TopicAndPartition(topic, partition) -> newReplicas
+ })
+ case None =>
+ Seq.empty
+ }
+ case None =>
+ Seq.empty
+ }
+ }
+
+ def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] =
+ parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
+
+ def parseTopicsData(jsonData: String): Seq[String] = {
+ var topics = List.empty[String]
+ Json.parseFull(jsonData) match {
+ case Some(m) =>
+ m.asInstanceOf[Map[String, Any]].get("topics") match {
+ case Some(partitionsSeq) =>
+ val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+ mapPartitionSeq.foreach(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ topics ++= List(topic)
+ })
+ case None =>
+ }
+ case None =>
+ }
+ topics
+ }
+
+ def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+ Json.encode(Map(
+ "version" -> 1,
+ "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
+ Map(
+ "topic" -> topic,
+ "partition" -> partition,
+ "replicas" -> replicas
+ )
+ }
+ ))
+ }
+
}
class ZkUtils(val zkClient: ZkClient,
@@ -337,7 +392,7 @@ class ZkUtils(val zkClient: ZkClient,
} else acls
if (!zkClient.exists(path))
- ZkPath.createPersistent(zkClient, path, true, acl) //won't throw NoNodeException or NodeExistsException
+ ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
}
/**
@@ -346,7 +401,7 @@ class ZkUtils(val zkClient: ZkClient,
private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) {
- ZkPath.createPersistent(zkClient, parentDir, true, acls)
+ ZkPath.createPersistent(zkClient, parentDir, createParents = true, acls)
}
}
@@ -357,10 +412,9 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createEphemeral(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException => {
+ case e: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
- }
}
}
@@ -372,14 +426,13 @@ class ZkUtils(val zkClient: ZkClient,
try {
createEphemeralPath(path, data, acls)
} catch {
- case e: ZkNodeExistsException => {
+ case e: ZkNodeExistsException =>
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
storedData = readData(path)._1
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
- case e2: Throwable => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -388,8 +441,6 @@ class ZkUtils(val zkClient: ZkClient,
// otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok")
}
- }
- case e2: Throwable => throw e2
}
}
@@ -400,10 +451,9 @@ class ZkUtils(val zkClient: ZkClient,
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
- case e: ZkNoNodeException => {
+ case e: ZkNoNodeException =>
createParentPath(path)
ZkPath.createPersistent(zkClient, path, data, acls)
- }
}
}
@@ -420,17 +470,14 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException => {
+ case e: ZkNoNodeException =>
createParentPath(path)
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
case e: ZkNodeExistsException =>
zkClient.writeData(path, data)
- case e2: Throwable => throw e2
}
- }
- case e2: Throwable => throw e2
}
}
@@ -493,11 +540,9 @@ class ZkUtils(val zkClient: ZkClient,
try {
zkClient.writeData(path, data)
} catch {
- case e: ZkNoNodeException => {
+ case e: ZkNoNodeException =>
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
- }
- case e2: Throwable => throw e2
}
}
@@ -509,7 +554,6 @@ class ZkUtils(val zkClient: ZkClient,
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
- case e2: Throwable => throw e2
}
}
@@ -533,7 +577,6 @@ class ZkUtils(val zkClient: ZkClient,
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
- case e2: Throwable => throw e2
}
}
@@ -544,13 +587,12 @@ class ZkUtils(val zkClient: ZkClient,
}
def readDataMaybeNull(path: String): (Option[String], Stat) = {
- val stat: Stat = new Stat()
+ val stat = new Stat()
val dataAndStat = try {
(Some(zkClient.readData(path, stat)), stat)
} catch {
case e: ZkNoNodeException =>
(None, stat)
- case e2: Throwable => throw e2
}
dataAndStat
}
@@ -568,7 +610,6 @@ class ZkUtils(val zkClient: ZkClient,
zkClient.getChildren(path)
} catch {
case e: ZkNoNodeException => Nil
- case e2: Throwable => throw e2
}
}
@@ -668,53 +709,6 @@ class ZkUtils(val zkClient: ZkClient,
}
}
- // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed
- def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
- Json.parseFull(jsonData) match {
- case Some(m) =>
- m.asInstanceOf[Map[String, Any]].get("partitions") match {
- case Some(partitionsSeq) =>
- partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
- val topic = p.get("topic").get.asInstanceOf[String]
- val partition = p.get("partition").get.asInstanceOf[Int]
- val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
- TopicAndPartition(topic, partition) -> newReplicas
- })
- case None =>
- Seq.empty
- }
- case None =>
- Seq.empty
- }
- }
-
- def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
- parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
- }
-
- def parseTopicsData(jsonData: String): Seq[String] = {
- var topics = List.empty[String]
- Json.parseFull(jsonData) match {
- case Some(m) =>
- m.asInstanceOf[Map[String, Any]].get("topics") match {
- case Some(partitionsSeq) =>
- val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
- mapPartitionSeq.foreach(p => {
- val topic = p.get("topic").get.asInstanceOf[String]
- topics ++= List(topic)
- })
- case None =>
- }
- case None =>
- }
- topics
- }
-
- def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
- Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
- "replicas" -> e._2))))
- }
-
def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
val zkPath = ReassignPartitionsPath
partitionsToBeReassigned.size match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index ac2c1ae..791c4d2 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -16,10 +16,11 @@ import kafka.admin.ReassignPartitionsCommand
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before, Test}
import org.junit.Assert.assertEquals
+
import scala.collection.Seq
@@ -73,7 +74,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1
- ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
waitForReasignmentToComplete()
//Then the replicas should span all three brokers
@@ -94,7 +95,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
//When rebalancing
val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1
- ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment))
+ ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment))
waitForReasignmentToComplete()
//Then replicas should only span the first two brokers
@@ -103,7 +104,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
def waitForReasignmentToComplete() {
- waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted")
+ waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted")
}
def json(topic: String): String = {