You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/20 19:05:02 UTC

git commit: kafka-1135; Code cleanup - use Json.encode() to write json data to zk; patched by Swapnil Ghike; reviewed by Neha Narkhede, Guozhang Wang and Jun Rao

Updated Branches:
  refs/heads/trunk 440e45e51 -> 9b0776d15


kafka-1135; Code cleanup - use Json.encode() to write json data to zk; patched by Swapnil Ghike; reviewed by Neha Narkhede, Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b0776d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b0776d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b0776d1

Branch: refs/heads/trunk
Commit: 9b0776d157afd9eacddb84a99f2420fa9c0d505b
Parents: 440e45e
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Nov 20 10:05:40 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 20 10:05:40 2013 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  9 +--
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  6 +-
 .../main/scala/kafka/consumer/TopicCount.scala  | 23 +-------
 .../consumer/ZookeeperConsumerConnector.scala   |  8 +--
 .../kafka/controller/KafkaController.scala      |  2 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  2 -
 core/src/main/scala/kafka/log/Log.scala         | 28 +++++-----
 core/src/main/scala/kafka/log/LogManager.scala  |  2 -
 core/src/main/scala/kafka/log/OffsetIndex.scala | 40 ++++++-------
 .../kafka/server/ZookeeperLeaderElector.scala   |  6 +-
 core/src/main/scala/kafka/utils/Utils.scala     | 59 --------------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 33 ++++-------
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  5 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 22 ++++----
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +---
 16 files changed, 72 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8ff4bd5..a167756 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -182,7 +182,7 @@ object AdminUtils extends Logging {
   private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
-      val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
+      val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
 
       if (!update) {
         info("Topic creation " + jsonPartitionData.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 26beb96..9b3c6ae 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -89,13 +89,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   def writePreferredReplicaElectionData(zkClient: ZkClient,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-    var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
-    for (p <- partitionsUndergoingPreferredReplicaElection) {
-      partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++
-                                               Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false))
-    }
-    val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false)
-    val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false)
+    val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+    val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
     try {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 981d2bb..3401afa 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -37,11 +37,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
   def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
   override def toString(): String = {
-    val jsonDataMap = new collection.mutable.HashMap[String, String]
-    jsonDataMap.put("leader", leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
-    jsonDataMap.put("ISR", isr.mkString(","))
-    Utils.mapToJson(jsonDataMap, valueInQuotes = true)
+    Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index a3eb53e..e332633 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -25,7 +25,7 @@ import kafka.common.KafkaException
 private[kafka] trait TopicCount {
 
   def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
-  def dbString: String
+  def getTopicCountMap: Map[String, Int]
   def pattern: String
   
   protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
@@ -111,24 +111,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
     }
   }
 
-  /**
-   *  return json of
-   *  { "topic1" : 4,
-   *    "topic2" : 4 }
-   */
-  def dbString = {
-    val builder = new StringBuilder
-    builder.append("{ ")
-    var i = 0
-    for ( (topic, nConsumers) <- topicCountMap) {
-      if (i > 0)
-        builder.append(",")
-      builder.append("\"" + topic + "\": " + nConsumers)
-      i += 1
-    }
-    builder.append(" }")
-    builder.toString()
-  }
+  def getTopicCountMap = topicCountMap
 
   def pattern = TopicCount.staticPattern
 }
@@ -142,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
     makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }
 
-  def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams)
+  def getTopicCountMap = Map(topicFilter.regex -> numStreams)
 
   def pattern: String = {
     topicFilter match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c0350cd..6d0cfa6 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,11 +220,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
     val timestamp = SystemTime.milliseconds.toString
-    val consumerRegistrationInfo =
-      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+    val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
+                                                  "timestamp" -> timestamp))
 
-    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null,
+                                                 (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88792c2..4c319ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -722,7 +722,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                                          newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
     try {
       val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+      val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
       ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index e1f8b97..6c099da 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -123,8 +123,6 @@ class FileMessageSet private[kafka](@volatile var file: File,
       if(offset >= targetOffset)
         return OffsetPosition(offset, position)
       val messageSize = buffer.getInt()
-      if(messageSize < Message.MessageOverhead)
-        throw new IllegalStateException("Invalid message size: " + messageSize)
       position += MessageSet.LogOverhead + messageSize
     }
     null

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 1883a53..9205128 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -155,19 +155,26 @@ class Log(val dir: File,
       activeSegment.index.resize(config.maxIndexSize)
     }
 
-    // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
-    for (s <- logSegments)
-      s.index.sanityCheck()
+    // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset.
+    for (s <- logSegments) {
+      require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+              "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+              .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+    }
   }
   
   private def recoverLog() {
-    // if we have the clean shutdown marker, skip recovery
-    if(hasCleanShutdownFile) {
-      this.recoveryPoint = activeSegment.nextOffset
+    val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
+    val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists()
+    if(!needsRecovery) {
+      this.recoveryPoint = lastOffset
+      return
+    }
+    if(lastOffset <= this.recoveryPoint) {
+      info("Log '%s' is fully intact, skipping recovery".format(name))
+      this.recoveryPoint = lastOffset
       return
     }
-
-    // okay we need to actually recover this log
     val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
     while(unflushed.hasNext) {
       val curr = unflushed.next
@@ -189,11 +196,6 @@ class Log(val dir: File,
       }
     }
   }
-  
-  /**
-   * Check if we have the "clean shutdown" file
-   */
-  private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
 
   /**
    * The number of segments in the log.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 81be88a..390b759 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -175,8 +175,6 @@ class LogManager(val logDirs: Array[File],
       allLogs.foreach(_.close())
       // update the last flush point
       checkpointRecoveryPointOffsets()
-      // mark that the shutdown was clean by creating the clean shutdown marker file
-      logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
     } finally {
       // regardless of whether the close succeeded, we need to unlock the data directories
       dirLocks.foreach(_.destroy())

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 96571b3..2f4e303 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -69,8 +69,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
           raf.setLength(roundToExactMultiple(maxIndexSize, 8))
         }
           
+        val len = raf.length()  
+        if(len < 0 || len % 8 != 0)
+          throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
+                                          " bytes which is not positive or not a multiple of 8.")
+          
         /* memory-map the file */
-        val len = raf.length()
         val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
           
         /* set the position in the index for the next entry */
@@ -95,20 +99,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   var maxEntries = mmap.limit / 8
   
   /* the last offset in the index */
-  var lastOffset = readLastEntry.offset
+  var lastOffset = readLastOffset()
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
 
   /**
-   * The last entry in the index
+   * The last offset written to the index
    */
-  def readLastEntry(): OffsetPosition = {
+  private def readLastOffset(): Long = {
     inLock(lock) {
-      size.get match {
-        case 0 => OffsetPosition(baseOffset, 0)
-        case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
-      }
+      val offset = 
+        size.get match {
+          case 0 => 0
+          case s => relativeOffset(this.mmap, s-1)
+        }
+      baseOffset + offset
     }
   }
 
@@ -173,7 +179,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   /* return the nth offset relative to the base offset */
   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
   
-  /* return the nth physical position */
+  /* return the nth physical offset */
   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
   
   /**
@@ -252,7 +258,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
     inLock(lock) {
       this.size.set(entries)
       mmap.position(this.size.get * 8)
-      this.lastOffset = readLastEntry.offset
+      this.lastOffset = readLastOffset
     }
   }
   
@@ -345,20 +351,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   }
   
   /**
-   * Do a basic sanity check on this index to detect obvious problems
-   * @throw IllegalArgumentException if any problems are found
-   */
-  def sanityCheck() {
-    require(entries == 0 || lastOffset > baseOffset,
-            "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-            .format(file.getAbsolutePath, lastOffset, baseOffset))
-      val len = file.length()
-      require(len % 8 == 0, 
-              "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
-              " bytes which is not positive or not a multiple of 8.")
-  }
-  
-  /**
    * Round a number to the greatest exact multiple of the given factor less than the given number.
    * E.g. roundToExactMultiple(67, 8) == 64
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 33b7360..cc6f1eb 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.{Utils, SystemTime, Logging}
+import kafka.utils.{Json, Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
@@ -49,9 +49,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     val timestamp = SystemTime.milliseconds.toString
-    val electString =
-      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
-        ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
 
     try {
       createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c9ca95f..a89b046 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -446,65 +446,6 @@ object Utils extends Logging {
   def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
 
   /**
-   * Merge JSON fields of the format "key" : value/object/array.
-   */
-  def mergeJsonFields(objects: Seq[String]): String = {
-    val builder = new StringBuilder
-    builder.append("{ ")
-    builder.append(objects.sorted.map(_.trim).mkString(", "))
-    builder.append(" }")
-    builder.toString
-  }
-
- /**
-   * Format a Map[String, String] as JSON object.
-   */
-  def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
-    val jsonFields: mutable.ListBuffer[String] = ListBuffer()
-    val builder = new StringBuilder
-    for ((key, value) <- jsonDataMap.toList.sorted) {
-      builder.append("\"" + key + "\":")
-      if (valueInQuotes)
-        builder.append("\"" + value + "\"")
-      else
-        builder.append(value)
-      jsonFields += builder.toString
-      builder.clear()
-    }
-    jsonFields
-  }
-
-  /**
-   * Format a Map[String, String] as JSON object.
-   */
-  def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
-    mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes))
-  }
-
-   /**
-   * Format a Seq[String] as JSON array.
-   */
-  def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
-    val builder = new StringBuilder
-    builder.append("[ ")
-    if (valueInQuotes)
-      builder.append(jsonData.map("\"" + _ + "\"").mkString(", "))
-    else
-      builder.append(jsonData.mkString(", "))
-    builder.append(" ]")
-    builder.toString
-  }
-
-  /**
-   * Format a Map[String, Seq[Int]] as JSON
-   */
-
-  def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
-    mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))),
-                                    valueInQuotes = false))
-  }
-
-  /**
    * Create a circular (looping) iterator over a collection.
    * @param coll An iterable over the underlying collection.
    * @return A circular iterator over the collection.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 856d136..73902b2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -32,10 +32,11 @@ import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
 import kafka.controller.PartitionAndReplica
 import kafka.controller.KafkaController
-import scala.Some
+import scala.{collection, Some}
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
 import kafka.utils.Utils.inLock
+import scala.collection
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -192,11 +193,8 @@ object ZkUtils extends Logging {
 
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
-    val brokerInfo =
-      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
-                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
-                                                   valueInQuotes = false))
+    val timestamp = SystemTime.milliseconds.toString
+    val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
     val expectedBroker = new Broker(id, host, port)
 
     try {
@@ -219,18 +217,17 @@ object ZkUtils extends Logging {
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
+
   def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    val isrInfo = Utils.seqToJson(leaderAndIsr.isr.map(_.toString), valueInQuotes = false)
-    Utils.mapToJson(Map("version" -> 1.toString, "leader" -> leaderAndIsr.leader.toString, "leader_epoch" -> leaderAndIsr.leaderEpoch.toString,
-                        "controller_epoch" -> controllerEpoch.toString, "isr" -> isrInfo), valueInQuotes = false)
+    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+                    "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
   }
 
   /**
    * Get JSON partition to replica map from zookeeper.
    */
-  def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
-    val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map)
-    Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
+  def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
+    Json.encode(Map("version" -> 1, "partitions" -> map))
   }
 
   /**
@@ -656,16 +653,8 @@ object ZkUtils extends Logging {
   }
 
   def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
-    var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
-    for (p <- partitionsToBeReassigned) {
-      val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false)
-      val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true)
-      val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData),
-                                                    valueInQuotes = false)
-      jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData)
-    }
-    Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)),
-                    valueInQuotes = false)
+    Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
+                                                                                          "replicas" -> e._2))))
   }
 
   def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 6b76037..5f2c2e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -212,14 +212,15 @@ class LogSegmentTest extends JUnit3Suite {
    */
   @Test
   def testRecoveryWithCorruptMessage() {
+    val rand = new Random(1)
     val messagesAppended = 20
     for(iteration <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
         seg.append(i, messages(i, i.toString))
-      val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
+      val offsetToBeginCorruption = rand.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
-      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
+      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1da1393..1571f1e 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -592,29 +592,29 @@ class LogTest extends JUnitSuite {
     val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
     val set = TestUtils.singleMessageSet("test".getBytes())
     val recoveryPoint = 50L
-    for(iteration <- 0 until 50) {
+    for(iteration <- 0 until 10) {
       // create a log and write some messages to it
-      logDir.mkdirs()
       var log = new Log(logDir,
                         config,
                         recoveryPoint = 0L,
                         time.scheduler,
                         time)
-      val numMessages = 50 + TestUtils.random.nextInt(50)
-      for(i <- 0 until numMessages)
+      for(i <- 0 until 100)
         log.append(set)
-      val messages = log.logSegments.flatMap(_.log.iterator.toList)
+      val seg = log.logSegments(0, recoveryPoint).last
+      val index = seg.index
+      val messages = seg.log
+      val filePosition = messages.searchFor(recoveryPoint, 0).position
+      val indexPosition = index.lookup(recoveryPoint).position
       log.close()
       
-      // corrupt index and log by appending random bytes
-      TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
-      TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
+      // corrupt file
+      TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition)
+      TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition)
       
       // attempt recovery
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
-      assertEquals(numMessages, log.logEndOffset)
-      assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
-      Utils.rm(logDir)
+      assertEquals(recoveryPoint, log.logEndOffset)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..777b315 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -518,15 +518,9 @@ object TestUtils extends Logging {
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)
+    val rand = new Random
     for(i <- 0 until size)
-      file.writeByte(random.nextInt(255))
-    file.close()
-  }
-  
-  def appendNonsenseToFile(fileName: File, size: Int) {
-    val file = new FileOutputStream(fileName, true)
-    for(i <- 0 until size)
-      file.write(random.nextInt(255))
+      file.writeByte(rand.nextInt(255))
     file.close()
   }