You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:21:59 UTC
[37/37] git commit: merge from 0.8 and resolve conflicts
Updated Branches:
refs/heads/trunk 82b11aa0d -> 4f2742d60
merge from 0.8 and resolve conflicts
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f2742d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f2742d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f2742d6
Branch: refs/heads/trunk
Commit: 4f2742d60d16f5ba468aa66d2c3ed7aa37479dce
Parents: 82b11aa 92ecebe
Author: Jun Rao <ju...@gmail.com>
Authored: Sun Mar 3 20:20:41 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Mar 3 20:20:41 2013 -0800
----------------------------------------------------------------------
bin/windows/kafka-run-class.bat | 33 +-
config/log4j.properties | 3 +-
contrib/hadoop-producer/README.md | 119 ++----
.../java/kafka/bridge/examples/TextPublisher.java | 14 +-
.../kafka/bridge/hadoop/KafkaOutputFormat.java | 103 ++---
.../kafka/bridge/hadoop/KafkaRecordWriter.java | 35 +-
.../java/kafka/bridge/pig/AvroKafkaStorage.java | 8 +-
core/src/main/scala/kafka/admin/AdminUtils.scala | 77 ++--
.../kafka/admin/CheckReassignmentStatus.scala | 2 +-
.../scala/kafka/admin/CreateTopicCommand.scala | 10 +-
.../PreferredReplicaLeaderElectionCommand.scala | 8 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 5 +-
.../main/scala/kafka/admin/ShutdownBroker.scala | 36 +-
core/src/main/scala/kafka/api/FetchRequest.scala | 8 +-
core/src/main/scala/kafka/api/FetchResponse.scala | 4 +-
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 2 +-
.../src/main/scala/kafka/api/ProducerRequest.scala | 13 +-
.../main/scala/kafka/api/ProducerResponse.scala | 2 -
.../main/scala/kafka/api/StopReplicaRequest.scala | 2 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 18 +-
core/src/main/scala/kafka/cluster/Partition.scala | 5 +-
.../kafka/common/LeaderNotAvailableException.scala | 7 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 3 +-
.../scala/kafka/consumer/ConsumerTopicStats.scala | 8 +-
.../consumer/FetchRequestAndResponseStats.scala | 8 +-
.../src/main/scala/kafka/consumer/TopicCount.scala | 127 +++---
.../consumer/ZookeeperConsumerConnector.scala | 9 +-
.../scala/kafka/controller/KafkaController.scala | 5 +-
.../kafka/controller/PartitionStateMachine.scala | 5 +-
core/src/main/scala/kafka/log/FileMessageSet.scala | 16 +-
core/src/main/scala/kafka/log/Log.scala | 30 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 36 +-
.../scala/kafka/message/ByteBufferMessageSet.scala | 26 +-
.../main/scala/kafka/network/RequestChannel.scala | 12 +-
.../main/scala/kafka/network/SocketServer.scala | 33 +-
.../main/scala/kafka/network/Transmission.scala | 22 +-
.../scala/kafka/producer/BrokerPartitionInfo.scala | 5 +-
.../scala/kafka/producer/ConsoleProducer.scala | 10 +-
.../scala/kafka/producer/KafkaLog4jAppender.scala | 13 +-
core/src/main/scala/kafka/producer/Producer.scala | 11 +-
.../kafka/producer/ProducerRequestStats.scala | 8 +-
.../main/scala/kafka/producer/ProducerStats.scala | 1 -
.../scala/kafka/producer/ProducerTopicStats.scala | 11 +-
.../main/scala/kafka/producer/SyncProducer.scala | 17 +-
.../kafka/producer/async/DefaultEventHandler.scala | 52 ++-
.../kafka/producer/async/ProducerSendThread.scala | 2 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 27 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 37 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +-
.../scala/kafka/server/KafkaRequestHandler.scala | 6 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 6 +-
.../scala/kafka/server/KafkaServerStartable.scala | 2 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 43 +-
.../main/scala/kafka/server/ReplicaManager.scala | 13 +
.../main/scala/kafka/server/RequestPurgatory.scala | 32 +-
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 32 +-
.../main/scala/kafka/tools/ExportZkOffsets.scala | 10 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 361 ++++++++++-----
.../kafka/tools/VerifyConsumerRebalance.scala | 5 +-
.../scala/kafka/utils/ShutdownableThread.scala | 5 +-
core/src/main/scala/kafka/utils/Utils.scala | 68 ++-
core/src/main/scala/kafka/utils/ZkUtils.scala | 151 +++----
.../test/scala/unit/kafka/admin/AdminTest.scala | 77 ++--
.../api/RequestResponseSerializationTest.scala | 2 +-
.../unit/kafka/integration/PrimitiveApiTest.scala | 55 ++-
.../integration/ProducerConsumerTestHarness.scala | 14 +-
.../scala/unit/kafka/log/OffsetIndexTest.scala | 8 +-
.../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 7 +-
.../unit/kafka/network/SocketServerTest.scala | 44 ++-
.../unit/kafka/producer/AsyncProducerTest.scala | 13 +-
.../scala/unit/kafka/producer/ProducerTest.scala | 3 +-
.../unit/kafka/producer/SyncProducerTest.scala | 70 ++--
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +-
.../unit/kafka/server/ServerShutdownTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 +-
.../test/scala/unit/kafka/utils/UtilsTest.scala | 5 +-
.../scala/kafka/perf/ProducerPerformance.scala | 2 +-
project/build/KafkaProject.scala | 2 +-
.../config/mirror_consumer.properties | 2 +-
.../testcase_5003/testcase_5003_properties.json | 2 +-
81 files changed, 1200 insertions(+), 922 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/config/log4j.properties
----------------------------------------------------------------------
diff --cc config/log4j.properties
index b36d3e0,5692da0..1891f38
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@@ -48,14 -42,15 +48,15 @@@ log4j.appender.cleanerAppender.layout.C
#log4j.logger.kafka.perf=DEBUG, kafkaAppender
#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
- log4j.logger.kafka=INFO
+ log4j.logger.kafka=INFO, kafkaAppender
-log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
+ #log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
-log4j.logger.kafka.request.logger=TRACE, requestAppender
+log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.controller=TRACE, stateChangeAppender
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/FileMessageSet.scala
index a74abfe,ce27a19..0eef33e
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@@ -100,15 -71,13 +100,15 @@@ class FileMessageSet private[kafka](@vo
}
/**
- * Search forward for the file position of the last offset that is great than or equal to the target offset
+ * Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
+ * @param targetOffset The offset to search for.
+ * @param startingPosition The starting position in the file to begin searching from.
*/
- private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+ def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
- val size = _size.get()
+ val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
@@@ -126,27 -95,25 +126,37 @@@
}
/**
- * Write some of this set to the given channel, return the amount written
+ * Write some of this set to the given channel.
+ * @param destChannel The channel to write to.
+ * @param writePosition The position in the message set to begin writing from.
+ * @param size The maximum number of bytes to write
+ * @return The number of bytes actually written.
*/
- def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
- channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
+ def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+ // Ensure that the underlying size has not changed.
- val newSize = scala.math.min(channel.size().toInt, limit) - start
++ val newSize = math.min(channel.size().toInt, end) - start
+ if (newSize < _size.get()) {
+ throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
+ .format(file.getAbsolutePath, _size.get(), newSize))
+ }
- val bytesTransferred = channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
++ val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
+ trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
- + " bytes requested for transfer : " + scala.math.min(size, sizeInBytes))
++ + " bytes requested for transfer : " + math.min(size, sizeInBytes))
+ bytesTransferred
+ }
/**
+ * Get a shallow iterator over the messages in the set.
+ */
+ override def iterator() = iterator(Int.MaxValue)
+
+ /**
* Get an iterator over the messages in the set. We only do shallow iteration here.
+ * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
+ * If we encounter a message larger than this we throw an InvalidMessageException.
+ * @return The iterator.
*/
- override def iterator: Iterator[MessageAndOffset] = {
+ def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index ac12b74,b2a7170..0cc03bb
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -63,17 -119,16 +63,19 @@@ class Log(val dir: File
private val unflushed = new AtomicInteger(0)
/* last time it was flushed */
- private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+ private val lastflushedTime = new AtomicLong(time.milliseconds)
/* the actual segments of the log */
- private[log] val segments: SegmentList[LogSegment] = loadSegments()
+ private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
+
+ /* The number of times the log has been truncated */
+ private val truncates = new AtomicInteger(0)
/* Calculate the offset of the next message */
- private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
-
+ private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
+
+ debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+
newGauge(name + "-" + "NumLogSegments",
new Gauge[Int] { def getValue = numberOfSegments })
@@@ -148,36 -169,74 +150,44 @@@
}
if(logSegments.size == 0) {
- // no existing segments, create a new mutable segment
- logSegments.add(new LogSegment(dir = dir,
+ // no existing segments, create a new mutable segment beginning at offset 0
+ logSegments.put(0,
+ new LogSegment(dir = dir,
startOffset = 0,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize))
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize))
} else {
- // there is at least one existing segment, validate and recover them/it
- // sort segments into ascending order for fast searching
- Collections.sort(logSegments, new Comparator[LogSegment] {
- def compare(s1: LogSegment, s2: LogSegment): Int = {
- if(s1.start == s2.start) 0
- else if(s1.start < s2.start) -1
- else 1
- }
- })
-
- // reset the index size of the last (current active) log segment to its maximum value
- logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
-
- // run recovery on the last segment if necessary
- if(needsRecovery)
- recoverSegment(logSegments.get(logSegments.size - 1))
+ // reset the index size of the currently active log segment to allow more entries
+ val active = logSegments.lastEntry.getValue
+ active.index.resize(config.maxIndexSize)
+
+ // run recovery on the active segment if necessary
+ if(needsRecovery) {
+ info("Recovering active segment of %s.".format(name))
+ active.recover(config.maxMessageSize)
+ }
}
+
- val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))
+ // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset.
- for (s <- segmentList) {
++ for (s <- asIterable(logSegments.values)) {
+ require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+ "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
- .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
++ .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+ }
+
- new SegmentList(segmentList)
+ logSegments
}
-
+
/**
- * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
+ * The number of segments in the log.
+ * Take care! this is an O(n) operation.
*/
- private def recoverSegment(segment: LogSegment) {
- info("Recovering log segment %s".format(segment.messageSet.file.getAbsolutePath))
- segment.index.truncate()
- var validBytes = 0
- var lastIndexEntry = 0
- val iter = segment.messageSet.iterator
- try {
- while(iter.hasNext) {
- val entry = iter.next
- entry.message.ensureValid()
- if(validBytes - lastIndexEntry > indexIntervalBytes) {
- segment.index.append(entry.offset, validBytes)
- lastIndexEntry = validBytes
- }
- validBytes += MessageSet.entrySize(entry.message)
- }
- } catch {
- case e: InvalidMessageException =>
- logger.warn("Found invalid messages in log " + name)
- }
- val truncated = segment.messageSet.sizeInBytes - validBytes
- if(truncated > 0)
- warn("Truncated " + truncated + " invalid bytes from the log " + name + ".")
- segment.messageSet.truncateTo(validBytes)
- }
-
+ def numberOfSegments: Int = segments.size
+
/**
- * The number of segments in the log
+ * The number of truncates that have occurred since the log was opened.
*/
- def numberOfSegments: Int = segments.view.length
+ def numberOfTruncates: Int = truncates.get
/**
* Close this log
@@@ -196,55 -255,76 +206,67 @@@
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
- * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set,
- * or (-1,-1) if the message set is empty
+ * @param messages The message set to append
+ * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+ *
+ * @throws KafkaStorageException If the append fails due to an I/O error.
+ *
+ * @return Information about the appended messages including the first and last offset
*/
- def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
- val messageSetInfo = analyzeAndValidateMessageSet(messages)
+ def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
+ val appendInfo = analyzeAndValidateMessageSet(messages)
// if we have any valid messages, append them to the log
- if(messageSetInfo.count == 0) {
- (-1L, -1L)
- } else {
- BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
-
- // trim any invalid bytes or partial messages before appending it to the on-disk log
- var validMessages = trimInvalidBytes(messages)
+ if(appendInfo.count == 0)
+ return appendInfo
+
+ // trim any invalid bytes or partial messages before appending it to the on-disk log
+ var validMessages = trimInvalidBytes(messages)
- try {
- // they are valid, insert them in the log
- val offsets = lock synchronized {
- // maybe roll the log if this segment is full
- val segment = maybeRoll(segments.view.last)
+ try {
+ // they are valid, insert them in the log
+ lock synchronized {
+ // maybe roll the log if this segment is full
+ val segment = maybeRoll()
+ if(assignOffsets) {
// assign offsets to the messageset
- val offsets =
- if(assignOffsets) {
- val offsetCounter = new AtomicLong(nextOffset.get)
- val firstOffset = offsetCounter.get
- try {
- validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
- } catch {
- case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
- }
- val lastOffset = offsetCounter.get - 1
- (firstOffset, lastOffset)
- } else {
- require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)
- require(messageSetInfo.firstOffset >= nextOffset.get,
- "Attempt to append a message set beginning with offset %d to a log with log end offset %d."
- .format(messageSetInfo.firstOffset, nextOffset.get))
- (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
- }
-
- // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
- // happens with the new message size (after re-compression, if any)
- for(messageAndOffset <- validMessages.shallowIterator) {
- if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
- .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
+ appendInfo.firstOffset = nextOffset.get
+ val offset = new AtomicLong(nextOffset.get)
- validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
++ try {
++ validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
++ } catch {
++ case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+ }
+ appendInfo.lastOffset = offset.get - 1
+ } else {
+ // we are taking the offsets we are given
+ if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+ throw new IllegalArgumentException("Out of order offsets found in " + messages)
+ }
- // now append to the log
- trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
- .format(this.name, offsets._1, nextOffset.get(), validMessages))
- segment.append(offsets._1, validMessages)
-
- // advance the log end offset
- nextOffset.set(offsets._2 + 1)
-
- // return the offset at which the messages were appended
- offsets
++ // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
++ // happens with the new message size (after re-compression, if any)
++ for(messageAndOffset <- validMessages.shallowIterator) {
++ if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize)
++ throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
++ .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+ }
++
+ // now append to the log
+ trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
+ segment.append(appendInfo.firstOffset, validMessages)
+ // increment the log end offset
+ nextOffset.set(appendInfo.lastOffset + 1)
+
// maybe flush the log and index
- maybeFlush(messageSetInfo.count)
+ maybeFlush(appendInfo.count)
- // return the first and last offset
- offsets
- } catch {
- case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
+ appendInfo
}
+ } catch {
+ case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
@@@ -259,21 -333,16 +281,20 @@@
/**
* Validate the following:
- * 1. each message matches its CRC
+ * <ol>
- * <li> each message is not too large
+ * <li> each message matches its CRC
+ * </ol>
*
* Also compute the following quantities:
- * 1. First offset in the message set
- * 2. Last offset in the message set
- * 3. Number of messages
- * 4. Whether the offsets are monotonically increasing
- * 5. Whether any compression codec is used (if many are used, then the last one is given)
+ * <ol>
+ * <li> First offset in the message set
+ * <li> Last offset in the message set
+ * <li> Number of messages
+ * <li> Whether the offsets are monotonically increasing
+ * <li> Whether any compression codec is used (if many are used, then the last one is given)
+ * </ol>
*/
- private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = {
+ private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
var messageCount = 0
var firstOffset, lastOffset = -1L
var codec: CompressionCodec = NoCompressionCodec
@@@ -288,12 -357,10 +309,9 @@@
// update the last offset seen
lastOffset = messageAndOffset.offset
- // check the validity of the message by checking CRC and message size
+ // check the validity of the message by checking CRC
val m = messageAndOffset.message
m.ensureValid()
- if(MessageSet.entrySize(m) > config.maxMessageSize)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), config.maxMessageSize))
-
-
messageCount += 1;
val messageCodec = m.compressionCodec
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/OffsetIndex.scala
index 23e659f,e806da9..eff213e
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@@ -90,9 -90,12 +90,9 @@@ class OffsetIndex(@volatile var file: F
/* the last offset in the index */
var lastOffset = readLastOffset()
- info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
- .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+ info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
- /* the maximum number of entries this index can hold */
- def maxEntries = mmap.limit / 8
-
/**
* The last offset written to the index
*/
@@@ -132,14 -126,10 +132,14 @@@
/**
* Find the slot in which the largest offset less than or equal to the given
* target offset is stored.
- * Return -1 if the least entry in the index is larger than the target offset or the index is empty
+ *
+ * @param idx The index buffer
+ * @param targetOffset The offset to look for
+ *
+ * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
*/
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
- // we only store the difference from the baseoffset so calculate that
+ // we only store the difference from the base offset so calculate that
val relOffset = targetOffset - baseOffset
// check if the index is empty
@@@ -207,9 -194,9 +206,9 @@@
def isFull: Boolean = entries >= this.maxEntries
/**
- * Truncate the entire index
+ * Truncate the entire index, deleting all entries
*/
- def truncate() = truncateTo(this.baseOffset)
+ def truncate() = truncateToEntries(0)
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index 5c5dbc9,cfabfc1..f30dca1
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -21,11 -21,8 +21,10 @@@ import kafka.admin.{CreateTopicCommand
import kafka.api._
import kafka.message._
import kafka.network._
+import kafka.log._
+import kafka.utils.ZKGroupTopicDirs
import org.apache.log4j.Logger
import scala.collection._
- import kafka.network.RequestChannel.Response
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
import kafka.metrics.KafkaMetricsGroup
@@@ -190,12 -190,7 +195,12 @@@ class KafkaApis(val requestChannel: Req
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
val log = localReplica.log.get
- val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+ val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
+
+ // update stats
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count)
- BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(info.count)
++ BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(info.count)
+
// we may need to increment high watermark since ISR could be down to 1
localReplica.partition.maybeIncrementLeaderHW(localReplica)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 710c08b,f7fe0de..573601f
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -50,7 -48,8 +50,8 @@@ class ReplicaManager(val config: KafkaC
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
- val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+ val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+ private var hwThreadInitialized = false
newGauge(
"LeaderCount",
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 334c9c1,f3a5095..2804908
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@@ -39,24 -40,23 +40,24 @@@ import java.util.concurrent.atomic.Atom
/**
- * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally,
- * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the
- * 07 cluster, and the kafka 08 producer produces data to the 08 cluster.
+ * This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
+ * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the
+ * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster.
*
- * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader.
- * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of
- * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will
- * will still load the 08 version class.
+ * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader.
+ * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of
+ * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will
+ * will still load the 0.8 version class.
*
- * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should
+ * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should
* also be used by the class loader.
*
- * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
- * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
+ * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
+ * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
*/
-
-public class KafkaMigrationTool {
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class KafkaMigrationTool
+{
private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/utils/Utils.scala
index 1c88226,fe4c925..37b3975
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@@ -528,38 -573,4 +552,38 @@@ object Utils extends Logging
* This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
*/
def abs(n: Int) = n & 0x7fffffff
-
-}
++
+ /**
+ * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
+ */
+ def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
+ if(!s.endsWith(oldSuffix))
+ throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
+ s.substring(0, s.length - oldSuffix.length) + newSuffix
+ }
+
+ /**
+ * Create a file with the given path
+ * @param path The path to create
+ * @throw KafkaStorageException If the file create fails
+ * @return The created file
+ */
+ def createFile(path: String): File = {
+ val f = new File(path)
+ val created = f.createNewFile()
+ if(!created)
+ throw new KafkaStorageException("Failed to create file %s.".format(path))
+ f
+ }
+
+ /**
+ * Read a big-endian integer from a byte array
+ */
+ def readInt(bytes: Array[Byte], offset: Int): Int = {
+ ((bytes(offset) & 0xFF) << 24) |
+ ((bytes(offset + 1) & 0xFF) << 16) |
+ ((bytes(offset + 2) & 0xFF) << 8) |
+ (bytes(offset + 3) & 0xFF)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 93a86e0,922a200..65a67e8
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@@ -394,10 -392,10 +392,11 @@@ class AsyncProducerTest extends JUnit3S
@Test
def testFailedSendRetryLogic() {
val props = new Properties()
+ props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ props.put("request.required.acks", "1")
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
- props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ props.put("producer.num.retries", 3.toString)
val config = new ProducerConfig(props)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 48487e8,db46247..7430485
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@@ -46,11 -44,11 +46,11 @@@ class LogRecoveryTest extends JUnit3Sui
val message = "hello"
var producer: Producer[Int, String] = null
- var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
- var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
+ var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
producerProps.put("request.required.acks", "-1")
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 48dd335,0b6244f..f7f734f
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@@ -19,12 -19,12 +19,13 @@@ package kafka.util
import java.util.Arrays
import java.nio.ByteBuffer
+import java.io._
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
- import org.junit.Test
import org.junit.Assert._
import kafka.common.KafkaException
+ import org.junit.{Test}
+ import kafka.tools.KafkaMigrationTool
class UtilsTest extends JUnitSuite {