You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/20 19:05:02 UTC
git commit: kafka-1135;
Code cleanup - use Json.encode() to write json data to zk;
patched by Swapnil Ghike; reviewed by Neha Narkhede, Guozhang Wang and Jun Rao
Updated Branches:
refs/heads/trunk 440e45e51 -> 9b0776d15
kafka-1135; Code cleanup - use Json.encode() to write json data to zk; patched by Swapnil Ghike; reviewed by Neha Narkhede, Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b0776d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b0776d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b0776d1
Branch: refs/heads/trunk
Commit: 9b0776d157afd9eacddb84a99f2420fa9c0d505b
Parents: 440e45e
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Nov 20 10:05:40 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 20 10:05:40 2013 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 9 +--
.../scala/kafka/api/LeaderAndIsrRequest.scala | 6 +-
.../main/scala/kafka/consumer/TopicCount.scala | 23 +-------
.../consumer/ZookeeperConsumerConnector.scala | 8 +--
.../kafka/controller/KafkaController.scala | 2 +-
.../main/scala/kafka/log/FileMessageSet.scala | 2 -
core/src/main/scala/kafka/log/Log.scala | 28 +++++-----
core/src/main/scala/kafka/log/LogManager.scala | 2 -
core/src/main/scala/kafka/log/OffsetIndex.scala | 40 ++++++-------
.../kafka/server/ZookeeperLeaderElector.scala | 6 +-
core/src/main/scala/kafka/utils/Utils.scala | 59 --------------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 33 ++++-------
.../scala/unit/kafka/log/LogSegmentTest.scala | 5 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 22 ++++----
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 +---
16 files changed, 72 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8ff4bd5..a167756 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -182,7 +182,7 @@ object AdminUtils extends Logging {
private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
- val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
+ val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 26beb96..9b3c6ae 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -89,13 +89,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
def writePreferredReplicaElectionData(zkClient: ZkClient,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
- var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
- for (p <- partitionsUndergoingPreferredReplicaElection) {
- partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++
- Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false))
- }
- val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false)
- val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false)
+ val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
+ val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
try {
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 981d2bb..3401afa 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -37,11 +37,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
override def toString(): String = {
- val jsonDataMap = new collection.mutable.HashMap[String, String]
- jsonDataMap.put("leader", leader.toString)
- jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
- jsonDataMap.put("ISR", isr.mkString(","))
- Utils.mapToJson(jsonDataMap, valueInQuotes = true)
+ Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index a3eb53e..e332633 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -25,7 +25,7 @@ import kafka.common.KafkaException
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
- def dbString: String
+ def getTopicCountMap: Map[String, Int]
def pattern: String
protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
@@ -111,24 +111,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
}
}
- /**
- * return json of
- * { "topic1" : 4,
- * "topic2" : 4 }
- */
- def dbString = {
- val builder = new StringBuilder
- builder.append("{ ")
- var i = 0
- for ( (topic, nConsumers) <- topicCountMap) {
- if (i > 0)
- builder.append(",")
- builder.append("\"" + topic + "\": " + nConsumers)
- i += 1
- }
- builder.append(" }")
- builder.toString()
- }
+ def getTopicCountMap = topicCountMap
def pattern = TopicCount.staticPattern
}
@@ -142,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}
- def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams)
+ def getTopicCountMap = Map(topicFilter.regex -> numStreams)
def pattern: String = {
topicFilter match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c0350cd..6d0cfa6 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,11 +220,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
val timestamp = SystemTime.milliseconds.toString
- val consumerRegistrationInfo =
- Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
- ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+ val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
+ "timestamp" -> timestamp))
- createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+ createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null,
+ (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
info("end registering consumer " + consumerIdString + " in ZK")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88792c2..4c319ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -722,7 +722,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
try {
val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
- val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+ val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index e1f8b97..6c099da 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -123,8 +123,6 @@ class FileMessageSet private[kafka](@volatile var file: File,
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
- if(messageSize < Message.MessageOverhead)
- throw new IllegalStateException("Invalid message size: " + messageSize)
position += MessageSet.LogOverhead + messageSize
}
null
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 1883a53..9205128 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -155,19 +155,26 @@ class Log(val dir: File,
activeSegment.index.resize(config.maxIndexSize)
}
- // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
- for (s <- logSegments)
- s.index.sanityCheck()
+ // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset.
+ for (s <- logSegments) {
+ require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+ "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
+ .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+ }
}
private def recoverLog() {
- // if we have the clean shutdown marker, skip recovery
- if(hasCleanShutdownFile) {
- this.recoveryPoint = activeSegment.nextOffset
+ val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
+ val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists()
+ if(!needsRecovery) {
+ this.recoveryPoint = lastOffset
+ return
+ }
+ if(lastOffset <= this.recoveryPoint) {
+ info("Log '%s' is fully intact, skipping recovery".format(name))
+ this.recoveryPoint = lastOffset
return
}
-
- // okay we need to actually recover this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val curr = unflushed.next
@@ -189,11 +196,6 @@ class Log(val dir: File,
}
}
}
-
- /**
- * Check if we have the "clean shutdown" file
- */
- private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
/**
* The number of segments in the log.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 81be88a..390b759 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -175,8 +175,6 @@ class LogManager(val logDirs: Array[File],
allLogs.foreach(_.close())
// update the last flush point
checkpointRecoveryPointOffsets()
- // mark that the shutdown was clean by creating the clean shutdown marker file
- logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
} finally {
// regardless of whether the close succeeded, we need to unlock the data directories
dirLocks.foreach(_.destroy())
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 96571b3..2f4e303 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -69,8 +69,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
+ val len = raf.length()
+ if(len < 0 || len % 8 != 0)
+ throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len +
+ " bytes which is not positive or not a multiple of 8.")
+
/* memory-map the file */
- val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* set the position in the index for the next entry */
@@ -95,20 +99,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
var maxEntries = mmap.limit / 8
/* the last offset in the index */
- var lastOffset = readLastEntry.offset
+ var lastOffset = readLastOffset()
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
/**
- * The last entry in the index
+ * The last offset written to the index
*/
- def readLastEntry(): OffsetPosition = {
+ private def readLastOffset(): Long = {
inLock(lock) {
- size.get match {
- case 0 => OffsetPosition(baseOffset, 0)
- case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
- }
+ val offset =
+ size.get match {
+ case 0 => 0
+ case s => relativeOffset(this.mmap, s-1)
+ }
+ baseOffset + offset
}
}
@@ -173,7 +179,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
/* return the nth offset relative to the base offset */
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
- /* return the nth physical position */
+ /* return the nth physical offset */
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
/**
@@ -252,7 +258,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
inLock(lock) {
this.size.set(entries)
mmap.position(this.size.get * 8)
- this.lastOffset = readLastEntry.offset
+ this.lastOffset = readLastOffset
}
}
@@ -345,20 +351,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
}
/**
- * Do a basic sanity check on this index to detect obvious problems
- * @throw IllegalArgumentException if any problems are found
- */
- def sanityCheck() {
- require(entries == 0 || lastOffset > baseOffset,
- "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
- .format(file.getAbsolutePath, lastOffset, baseOffset))
- val len = file.length()
- require(len % 8 == 0,
- "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
- " bytes which is not positive or not a multiple of 8.")
- }
-
- /**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundToExactMultiple(67, 8) == 64
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 33b7360..cc6f1eb 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
package kafka.server
import kafka.utils.ZkUtils._
-import kafka.utils.{Utils, SystemTime, Logging}
+import kafka.utils.{Json, Utils, SystemTime, Logging}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
@@ -49,9 +49,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
- val electString =
- Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
- ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+ val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
try {
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c9ca95f..a89b046 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -446,65 +446,6 @@ object Utils extends Logging {
def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
/**
- * Merge JSON fields of the format "key" : value/object/array.
- */
- def mergeJsonFields(objects: Seq[String]): String = {
- val builder = new StringBuilder
- builder.append("{ ")
- builder.append(objects.sorted.map(_.trim).mkString(", "))
- builder.append(" }")
- builder.toString
- }
-
- /**
- * Format a Map[String, String] as JSON object.
- */
- def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
- val jsonFields: mutable.ListBuffer[String] = ListBuffer()
- val builder = new StringBuilder
- for ((key, value) <- jsonDataMap.toList.sorted) {
- builder.append("\"" + key + "\":")
- if (valueInQuotes)
- builder.append("\"" + value + "\"")
- else
- builder.append(value)
- jsonFields += builder.toString
- builder.clear()
- }
- jsonFields
- }
-
- /**
- * Format a Map[String, String] as JSON object.
- */
- def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
- mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes))
- }
-
- /**
- * Format a Seq[String] as JSON array.
- */
- def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
- val builder = new StringBuilder
- builder.append("[ ")
- if (valueInQuotes)
- builder.append(jsonData.map("\"" + _ + "\"").mkString(", "))
- else
- builder.append(jsonData.mkString(", "))
- builder.append(" ]")
- builder.toString
- }
-
- /**
- * Format a Map[String, Seq[Int]] as JSON
- */
-
- def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
- mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))),
- valueInQuotes = false))
- }
-
- /**
* Create a circular (looping) iterator over a collection.
* @param coll An iterable over the underlying collection.
* @return A circular iterator over the collection.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 856d136..73902b2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -32,10 +32,11 @@ import kafka.common.{KafkaException, NoEpochForPartitionException}
import kafka.controller.ReassignedPartitionsContext
import kafka.controller.PartitionAndReplica
import kafka.controller.KafkaController
-import scala.Some
+import scala.{collection, Some}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
import kafka.utils.Utils.inLock
+import scala.collection
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@@ -192,11 +193,8 @@ object ZkUtils extends Logging {
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
- val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
- val brokerInfo =
- Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
- Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
- valueInQuotes = false))
+ val timestamp = SystemTime.milliseconds.toString
+ val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
val expectedBroker = new Broker(id, host, port)
try {
@@ -219,18 +217,17 @@ object ZkUtils extends Logging {
topicDirs.consumerOwnerDir + "/" + partition
}
+
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
- val isrInfo = Utils.seqToJson(leaderAndIsr.isr.map(_.toString), valueInQuotes = false)
- Utils.mapToJson(Map("version" -> 1.toString, "leader" -> leaderAndIsr.leader.toString, "leader_epoch" -> leaderAndIsr.leaderEpoch.toString,
- "controller_epoch" -> controllerEpoch.toString, "isr" -> isrInfo), valueInQuotes = false)
+ Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+ "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
}
/**
* Get JSON partition to replica map from zookeeper.
*/
- def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
- val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map)
- Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
+ def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
+ Json.encode(Map("version" -> 1, "partitions" -> map))
}
/**
@@ -656,16 +653,8 @@ object ZkUtils extends Logging {
}
def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
- var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
- for (p <- partitionsToBeReassigned) {
- val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false)
- val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true)
- val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData),
- valueInQuotes = false)
- jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData)
- }
- Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)),
- valueInQuotes = false)
+ Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
+ "replicas" -> e._2))))
}
def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 6b76037..5f2c2e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -212,14 +212,15 @@ class LogSegmentTest extends JUnit3Suite {
*/
@Test
def testRecoveryWithCorruptMessage() {
+ val rand = new Random(1)
val messagesAppended = 20
for(iteration <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
seg.append(i, messages(i, i.toString))
- val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
+ val offsetToBeginCorruption = rand.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
- val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
+ val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
seg.recover(64*1024)
assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1da1393..1571f1e 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -592,29 +592,29 @@ class LogTest extends JUnitSuite {
val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
val set = TestUtils.singleMessageSet("test".getBytes())
val recoveryPoint = 50L
- for(iteration <- 0 until 50) {
+ for(iteration <- 0 until 10) {
// create a log and write some messages to it
- logDir.mkdirs()
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
- val numMessages = 50 + TestUtils.random.nextInt(50)
- for(i <- 0 until numMessages)
+ for(i <- 0 until 100)
log.append(set)
- val messages = log.logSegments.flatMap(_.log.iterator.toList)
+ val seg = log.logSegments(0, recoveryPoint).last
+ val index = seg.index
+ val messages = seg.log
+ val filePosition = messages.searchFor(recoveryPoint, 0).position
+ val indexPosition = index.lookup(recoveryPoint).position
log.close()
- // corrupt index and log by appending random bytes
- TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
- TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
+ // corrupt file
+ TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition)
+ TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition)
// attempt recovery
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
- assertEquals(numMessages, log.logEndOffset)
- assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
- Utils.rm(logDir)
+ assertEquals(recoveryPoint, log.logEndOffset)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b0776d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..777b315 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -518,15 +518,9 @@ object TestUtils extends Logging {
def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
val file = new RandomAccessFile(fileName, "rw")
file.seek(position)
+ val rand = new Random
for(i <- 0 until size)
- file.writeByte(random.nextInt(255))
- file.close()
- }
-
- def appendNonsenseToFile(fileName: File, size: Int) {
- val file = new FileOutputStream(fileName, true)
- for(i <- 0 until size)
- file.write(random.nextInt(255))
+ file.writeByte(rand.nextInt(255))
file.close()
}