You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/06 21:54:57 UTC
git commit: Standardize Zk data structures for Re-assign partitions
and Preferred replication election; patched by Swapnil Ghike;
reviewed by Jun Rao; kafka-779
Updated Branches:
refs/heads/0.8 2457bc49e -> eae1bd52e
Standardize Zk data structures for Re-assign partitions and Preferred replication election; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-779
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eae1bd52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eae1bd52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eae1bd52
Branch: refs/heads/0.8
Commit: eae1bd52ef1d991834151898e768e57de8bb07bf
Parents: 2457bc4
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Mar 6 12:54:39 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 6 12:54:39 2013 -0800
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 32 +++++---
.../kafka/admin/ReassignPartitionsCommand.scala | 23 ++----
.../src/main/scala/kafka/api/ProducerRequest.scala | 1 -
.../consumer/ZookeeperConsumerConnector.scala | 4 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 4 +-
core/src/main/scala/kafka/utils/Utils.scala | 71 ++++++---------
core/src/main/scala/kafka/utils/ZkUtils.scala | 51 +++++++----
.../test/scala/unit/kafka/utils/UtilsTest.scala | 3 +-
8 files changed, 94 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index faf0e3e..e59d5af 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -21,6 +21,8 @@ import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import collection._
+import mutable.ListBuffer
object PreferredReplicaLeaderElectionCommand extends Logging {
@@ -28,7 +30,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
val parser = new OptionParser
val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
"for which preferred replica leader election should be done, in the following format - \n" +
- "[{\"topic\": \"foo\", \"partition\": \"1\"}, {\"topic\": \"foobar\", \"partition\": \"2\"}]. \n" +
+ "{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
"Defaults to all existing partitions")
.withRequiredArg
.describedAs("list of partitions for which preferred replica leader election needs to be triggered")
@@ -67,14 +69,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}
- def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
+ def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
Json.parseFull(jsonString) match {
- case Some(partitionList) =>
- val partitions = (partitionList.asInstanceOf[List[Any]])
- Set.empty[TopicAndPartition] ++ partitions.map { m =>
- val topic = m.asInstanceOf[Map[String, String]].get("topic").get
- val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
- TopicAndPartition(topic, partition)
+ case Some(m) =>
+ m.asInstanceOf[Map[String, Any]].get("partitions") match {
+ case Some(partitionsList) =>
+ val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
+ partitions.map { p =>
+ val topic = p.get("topic").get.asInstanceOf[String]
+ val partition = p.get("partition").get.asInstanceOf[Int]
+ TopicAndPartition(topic, partition)
+ }.toSet
+ case None => throw new AdministrationException("Preferred replica election data is empty")
}
case None => throw new AdministrationException("Preferred replica election data is empty")
}
@@ -83,9 +89,13 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
def writePreferredReplicaElectionData(zkClient: ZkClient,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
- val jsonData = Utils.seqToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
- Utils.mapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)), valueInQuotes = true)
- }.toSeq.sorted, valueInQuotes = false)
+ var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
+ for (p <- partitionsUndergoingPreferredReplicaElection) {
+ partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++
+ Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false))
+ }
+ val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false)
+ val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false)
try {
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index d3845cc..8d287f4 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -28,7 +28,7 @@ object ReassignPartitionsCommand extends Logging {
val parser = new OptionParser
val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
"new replicas they should be reassigned to in the following format - \n" +
- "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]")
+ "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}")
.withRequiredArg
.describedAs("partition reassignment json file path")
.ofType(classOf[String])
@@ -55,18 +55,9 @@ object ReassignPartitionsCommand extends Logging {
try {
// read the json file into a string
- val partitionsToBeReassigned = Json.parseFull(jsonString) match {
- case Some(reassignedPartitions) =>
- val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
- partitions.map { m =>
- val topic = m.asInstanceOf[Map[String, String]].get("topic").get
- val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
- val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
- val newReplicas = replicasList.split(",").map(_.toInt)
- (TopicAndPartition(topic, partition), newReplicas.toSeq)
- }.toMap
- case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
- }
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+ if (partitionsToBeReassigned.isEmpty)
+ throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
@@ -86,15 +77,15 @@ object ReassignPartitionsCommand extends Logging {
}
}
-class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[TopicAndPartition, Seq[Int]])
+class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
extends Logging {
def reassignPartitions(): Boolean = {
try {
val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
- val jsonReassignmentData = Utils.mapWithSeqValuesToJson(validPartitions.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
+ val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
- }catch {
+ } catch {
case ze: ZkNodeExistsException =>
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
throw new AdminCommandFailedException("Partition reassignment currently in " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 1e05d7e..fda3e39 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -19,7 +19,6 @@ package kafka.api
import java.nio._
import kafka.message._
-import scala.collection.Map
import kafka.api.ApiUtils._
import kafka.common._
import kafka.network.RequestChannel.Response
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c9e4127..b266f3f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,8 +213,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
val consumerRegistrationInfo =
- Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false),
- Utils.mapToJson(Map("pattern" -> topicCount.pattern), valueInQuotes = true)))
+ Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
+ ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
info("end registering consumer " + consumerIdString + " in ZK")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index f3a5095..7f0d1ce 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -35,7 +35,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index fe4c925..c639efb 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -465,27 +465,42 @@ object Utils extends Logging {
def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
/**
- * Format a Map[String, String] as JSON object.
+ * Merge JSON fields of the format "key" : value/object/array.
*/
- def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
+ def mergeJsonFields(objects: Seq[String]): String = {
val builder = new StringBuilder
builder.append("{ ")
- var numElements = 0
- for ( (key, value) <- jsonDataMap.toList.sorted) {
- if (numElements > 0)
- builder.append(", ")
+ builder.append(objects.sorted.map(_.trim).mkString(", "))
+ builder.append(" }")
+ builder.toString
+ }
+
+ /**
+ * Format a Map[String, String] as JSON object.
+ */
+ def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
+ val jsonFields: mutable.ListBuffer[String] = ListBuffer()
+ val builder = new StringBuilder
+ for ((key, value) <- jsonDataMap.toList.sorted) {
builder.append("\"" + key + "\":")
if (valueInQuotes)
builder.append("\"" + value + "\"")
else
builder.append(value)
- numElements += 1
+ jsonFields += builder.toString
+ builder.clear()
}
- builder.append(" }")
- builder.toString
+ jsonFields
}
/**
+ * Format a Map[String, String] as JSON object.
+ */
+ def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
+ mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes))
+ }
+
+ /**
* Format a Seq[String] as JSON array.
*/
def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
@@ -504,45 +519,11 @@ object Utils extends Logging {
*/
def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
- val builder = new StringBuilder
- builder.append("{ ")
- var numElements = 0
- for ((key, value) <- jsonDataMap.toList.sortBy(_._1)) {
- if (numElements > 0)
- builder.append(", ")
- builder.append("\"" + key + "\": ")
- builder.append(Utils.seqToJson(value.map(_.toString), valueInQuotes = false))
- numElements += 1
- }
- builder.append(" }")
- builder.toString
- }
-
-
- /**
- * Merge arbitrary JSON objects.
- */
- def mergeJsonObjects(objects: Seq[String]): String = {
- val builder = new StringBuilder
- builder.append("{ ")
- var obs = List[String]()
- objects.foreach(ob => obs = obs ::: getJsonContents(ob).split(',').toList)
- obs = obs.sorted.map(_.trim)
- builder.append(obs.mkString(", "))
- builder.append(" }")
- builder.toString
+ mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))),
+ valueInQuotes = false))
}
/**
- * Get the contents of a JSON object or array.
- */
- def getJsonContents(str: String): String = {
- str.trim().substring(1, str.length - 1)
- }
-
-
-
- /**
* Create a circular (looping) iterator over a collection.
* @param coll An iterable over the underlying collection.
* @return A circular iterator over the collection.
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f0aba12..9a0e250 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -24,6 +24,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
import org.I0Itec.zkclient.serialize.ZkSerializer
import collection._
import kafka.api.LeaderAndIsr
+import mutable.ListBuffer
import org.apache.zookeeper.data.Stat
import java.util.concurrent.locks.{ReentrantLock, Condition}
import kafka.admin._
@@ -183,8 +184,9 @@ object ZkUtils extends Logging {
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val brokerInfo =
- Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("host" -> host), valueInQuotes = true),
- Utils.mapToJson(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), valueInQuotes = false)))
+ Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
+ Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString),
+ valueInQuotes = false))
try {
createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
} catch {
@@ -209,7 +211,7 @@ object ZkUtils extends Logging {
* Get JSON partition to replica map from zookeeper.
*/
def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
- val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map.map(e => (e._1.toString -> e._2)))
+ val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map)
Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
}
@@ -559,26 +561,41 @@ object ZkUtils extends Logging {
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
- reassignedPartitions.map { p =>
- val newReplicas = p._2
- (p._1 -> new ReassignedPartitionsContext(newReplicas))
- }
+ reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
}
}
- def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
+ def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
+ val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map()
Json.parseFull(jsonData) match {
case Some(m) =>
- val replicaMap = m.asInstanceOf[Map[String, Seq[Int]]]
- replicaMap.map { reassignedPartitions =>
- val topic = reassignedPartitions._1.split(",").head.trim
- val partition = reassignedPartitions._1.split(",").last.trim.toInt
- val newReplicas = reassignedPartitions._2
- TopicAndPartition(topic, partition) -> newReplicas
+ m.asInstanceOf[Map[String, Any]].get("partitions") match {
+ case Some(partitionsSeq) =>
+ partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ val partition = p.get("partition").get.asInstanceOf[Int]
+ val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
+ reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas
+ })
+ case None =>
}
- case None => Map.empty[TopicAndPartition, Seq[Int]]
+ case None =>
+ }
+ reassignedPartitions
+ }
+
+ def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
+ var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
+ for (p <- partitionsToBeReassigned) {
+ val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false)
+ val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true)
+ val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData),
+ valueInQuotes = false)
+ jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData)
}
+ Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)),
+ valueInQuotes = false)
}
def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
@@ -588,11 +605,11 @@ object ZkUtils extends Logging {
deletePath(zkClient, zkPath)
info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
case _ =>
- val jsonData = Utils.mapWithSeqValuesToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
+ val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
try {
updatePersistentPath(zkClient, zkPath, jsonData)
info("Updated partition reassignment path with %s".format(jsonData))
- }catch {
+ } catch {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
http://git-wip-us.apache.org/repos/asf/kafka/blob/eae1bd52/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 0b6244f..6b21554 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -23,8 +23,7 @@ import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
import kafka.common.KafkaException
-import org.junit.{Test}
-import kafka.tools.KafkaMigrationTool
+import org.junit.Test
class UtilsTest extends JUnitSuite {