You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/04 05:21:59 UTC

[37/37] git commit: merge from 0.8 and resolve conflicts

Updated Branches:
  refs/heads/trunk 82b11aa0d -> 4f2742d60


merge from 0.8 and resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f2742d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f2742d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f2742d6

Branch: refs/heads/trunk
Commit: 4f2742d60d16f5ba468aa66d2c3ed7aa37479dce
Parents: 82b11aa 92ecebe
Author: Jun Rao <ju...@gmail.com>
Authored: Sun Mar 3 20:20:41 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Mar 3 20:20:41 2013 -0800

----------------------------------------------------------------------
 bin/windows/kafka-run-class.bat                    |   33 +-
 config/log4j.properties                            |    3 +-
 contrib/hadoop-producer/README.md                  |  119 ++----
 .../java/kafka/bridge/examples/TextPublisher.java  |   14 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java     |  103 ++---
 .../kafka/bridge/hadoop/KafkaRecordWriter.java     |   35 +-
 .../java/kafka/bridge/pig/AvroKafkaStorage.java    |    8 +-
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   77 ++--
 .../kafka/admin/CheckReassignmentStatus.scala      |    2 +-
 .../scala/kafka/admin/CreateTopicCommand.scala     |   10 +-
 .../PreferredReplicaLeaderElectionCommand.scala    |    8 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |    5 +-
 .../main/scala/kafka/admin/ShutdownBroker.scala    |   36 +-
 core/src/main/scala/kafka/api/FetchRequest.scala   |    8 +-
 core/src/main/scala/kafka/api/FetchResponse.scala  |    4 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    2 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |   13 +-
 .../main/scala/kafka/api/ProducerResponse.scala    |    2 -
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    2 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |    2 +-
 core/src/main/scala/kafka/cluster/Broker.scala     |   18 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |    5 +-
 .../kafka/common/LeaderNotAvailableException.scala |    7 +-
 .../kafka/consumer/ConsumerFetcherThread.scala     |    3 +-
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |    8 +-
 .../consumer/FetchRequestAndResponseStats.scala    |    8 +-
 .../src/main/scala/kafka/consumer/TopicCount.scala |  127 +++---
 .../consumer/ZookeeperConsumerConnector.scala      |    9 +-
 .../scala/kafka/controller/KafkaController.scala   |    5 +-
 .../kafka/controller/PartitionStateMachine.scala   |    5 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |   16 +-
 core/src/main/scala/kafka/log/Log.scala            |   30 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   36 +-
 .../scala/kafka/message/ByteBufferMessageSet.scala |   26 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   12 +-
 .../main/scala/kafka/network/SocketServer.scala    |   33 +-
 .../main/scala/kafka/network/Transmission.scala    |   22 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    5 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |   10 +-
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |   13 +-
 core/src/main/scala/kafka/producer/Producer.scala  |   11 +-
 .../kafka/producer/ProducerRequestStats.scala      |    8 +-
 .../main/scala/kafka/producer/ProducerStats.scala  |    1 -
 .../scala/kafka/producer/ProducerTopicStats.scala  |   11 +-
 .../main/scala/kafka/producer/SyncProducer.scala   |   17 +-
 .../kafka/producer/async/DefaultEventHandler.scala |   52 ++-
 .../kafka/producer/async/ProducerSendThread.scala  |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   27 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   37 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |    3 +-
 .../scala/kafka/server/KafkaRequestHandler.scala   |    6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    6 +-
 .../scala/kafka/server/KafkaServerStartable.scala  |    2 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   43 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   13 +
 .../main/scala/kafka/server/RequestPurgatory.scala |   32 +-
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |   32 +-
 .../main/scala/kafka/tools/ExportZkOffsets.scala   |   10 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |  361 ++++++++++-----
 .../kafka/tools/VerifyConsumerRebalance.scala      |    5 +-
 .../scala/kafka/utils/ShutdownableThread.scala     |    5 +-
 core/src/main/scala/kafka/utils/Utils.scala        |   68 ++-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  151 +++----
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   77 ++--
 .../api/RequestResponseSerializationTest.scala     |    2 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |   55 ++-
 .../integration/ProducerConsumerTestHarness.scala  |   14 +-
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |    8 +-
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |    7 +-
 .../unit/kafka/network/SocketServerTest.scala      |   44 ++-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   13 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |    3 +-
 .../unit/kafka/producer/SyncProducerTest.scala     |   70 ++--
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    2 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |    2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   25 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |    5 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |    2 +-
 project/build/KafkaProject.scala                   |    2 +-
 .../config/mirror_consumer.properties              |    2 +-
 .../testcase_5003/testcase_5003_properties.json    |    2 +-
 81 files changed, 1200 insertions(+), 922 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/config/log4j.properties
----------------------------------------------------------------------
diff --cc config/log4j.properties
index b36d3e0,5692da0..1891f38
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@@ -48,14 -42,15 +48,15 @@@ log4j.appender.cleanerAppender.layout.C
  #log4j.logger.kafka.perf=DEBUG, kafkaAppender
  #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
  #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
- log4j.logger.kafka=INFO
+ log4j.logger.kafka=INFO, kafkaAppender
  
 -log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
 +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
  log4j.additivity.kafka.network.RequestChannel$=false
  
+ #log4j.logger.kafka.network.Processor=TRACE, requestAppender
  #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
  #log4j.additivity.kafka.server.KafkaApis=false
 -log4j.logger.kafka.request.logger=TRACE, requestAppender
 +log4j.logger.kafka.request.logger=WARN, requestAppender
  log4j.additivity.kafka.request.logger=false
  
  log4j.logger.kafka.controller=TRACE, stateChangeAppender

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/FileMessageSet.scala
index a74abfe,ce27a19..0eef33e
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@@ -100,15 -71,13 +100,15 @@@ class FileMessageSet private[kafka](@vo
    }
    
    /**
-    * Search forward for the file position of the last offset that is great than or equal to the target offset 
+    * Search forward for the file position of the last offset that is greater than or equal to the target offset
     * and return its physical position. If no such offsets are found, return null.
 +   * @param targetOffset The offset to search for.
 +   * @param startingPosition The starting position in the file to begin searching from.
     */
 -  private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
 +  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
      var position = startingPosition
      val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
 -    val size = _size.get()
 +    val size = sizeInBytes()
      while(position + MessageSet.LogOverhead < size) {
        buffer.rewind()
        channel.read(buffer, position)
@@@ -126,27 -95,25 +126,37 @@@
    }
    
    /**
 -   * Write some of this set to the given channel, return the amount written
 +   * Write some of this set to the given channel.
 +   * @param destChannel The channel to write to.
 +   * @param writePosition The position in the message set to begin writing from.
 +   * @param size The maximum number of bytes to write
 +   * @return The number of bytes actually written.
     */
-   def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
-     channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
+   def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+     // Ensure that the underlying size has not changed.
 -    val newSize = scala.math.min(channel.size().toInt, limit) - start
++    val newSize = math.min(channel.size().toInt, end) - start
+     if (newSize < _size.get()) {
+       throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
+         .format(file.getAbsolutePath, _size.get(), newSize))
+     }
 -    val bytesTransferred = channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
++    val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
+     trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
 -      + " bytes requested for transfer : " + scala.math.min(size, sizeInBytes))
++      + " bytes requested for transfer : " + math.min(size, sizeInBytes))
+     bytesTransferred
+   }
    
    /**
 +   * Get a shallow iterator over the messages in the set.
 +   */
 +  override def iterator() = iterator(Int.MaxValue)
 +    
 +  /**
     * Get an iterator over the messages in the set. We only do shallow iteration here.
 +   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. 
 +   * If we encounter a message larger than this we throw an InvalidMessageException.
 +   * @return The iterator.
     */
 -  override def iterator: Iterator[MessageAndOffset] = {
 +  def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
      new IteratorTemplate[MessageAndOffset] {
        var location = start
        

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index ac12b74,b2a7170..0cc03bb
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -63,17 -119,16 +63,19 @@@ class Log(val dir: File
    private val unflushed = new AtomicInteger(0)
  
    /* last time it was flushed */
 -  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 +  private val lastflushedTime = new AtomicLong(time.milliseconds)
  
    /* the actual segments of the log */
 -  private[log] val segments: SegmentList[LogSegment] = loadSegments()
 +  private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
 +  
 +  /* The number of times the log has been truncated */
 +  private val truncates = new AtomicInteger(0)
      
    /* Calculate the offset of the next message */
 -  private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 -  
 +  private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
 +
+   debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+ 
    newGauge(name + "-" + "NumLogSegments",
             new Gauge[Int] { def getValue = numberOfSegments })
  
@@@ -148,36 -169,74 +150,44 @@@
      }
  
      if(logSegments.size == 0) {
 -      // no existing segments, create a new mutable segment
 -      logSegments.add(new LogSegment(dir = dir, 
 +      // no existing segments, create a new mutable segment beginning at offset 0
 +      logSegments.put(0,
 +                      new LogSegment(dir = dir, 
                                       startOffset = 0,
 -                                     indexIntervalBytes = indexIntervalBytes, 
 -                                     maxIndexSize = maxIndexSize))
 +                                     indexIntervalBytes = config.indexInterval, 
 +                                     maxIndexSize = config.maxIndexSize))
      } else {
 -      // there is at least one existing segment, validate and recover them/it
 -      // sort segments into ascending order for fast searching
 -      Collections.sort(logSegments, new Comparator[LogSegment] {
 -        def compare(s1: LogSegment, s2: LogSegment): Int = {
 -          if(s1.start == s2.start) 0
 -          else if(s1.start < s2.start) -1
 -          else 1
 -        }
 -      })
 -
 -      // reset the index size of the last (current active) log segment to its maximum value
 -      logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
 -
 -      // run recovery on the last segment if necessary
 -      if(needsRecovery)
 -        recoverSegment(logSegments.get(logSegments.size - 1))
 +      // reset the index size of the currently active log segment to allow more entries
 +      val active = logSegments.lastEntry.getValue
 +      active.index.resize(config.maxIndexSize)
 +
 +      // run recovery on the active segment if necessary
 +      if(needsRecovery) {
 +        info("Recovering active segment of %s.".format(name))
 +        active.recover(config.maxMessageSize)
 +      }
      }
+ 
 -    val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))
+     // Check for the index file of every segment, if it's empty or its last offset is greater than its base offset.
 -    for (s <- segmentList) {
++    for (s <- asIterable(logSegments.values)) {
+       require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset,
+               "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
 -                .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
++              .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))
+     }
+ 
 -    new SegmentList(segmentList)
 +    logSegments
    }
 -  
 +
    /**
 -   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
 +   * The number of segments in the log.
 +   * Take care! this is an O(n) operation.
     */
 -  private def recoverSegment(segment: LogSegment) {
 -    info("Recovering log segment %s".format(segment.messageSet.file.getAbsolutePath))
 -    segment.index.truncate()
 -    var validBytes = 0
 -    var lastIndexEntry = 0
 -    val iter = segment.messageSet.iterator
 -    try {
 -      while(iter.hasNext) {
 -        val entry = iter.next
 -        entry.message.ensureValid()
 -        if(validBytes - lastIndexEntry > indexIntervalBytes) {
 -          segment.index.append(entry.offset, validBytes)
 -          lastIndexEntry = validBytes
 -        }
 -        validBytes += MessageSet.entrySize(entry.message)
 -      }
 -    } catch {
 -      case e: InvalidMessageException => 
 -        logger.warn("Found invalid messages in log " + name)
 -    }
 -    val truncated = segment.messageSet.sizeInBytes - validBytes
 -    if(truncated > 0)
 -      warn("Truncated " + truncated + " invalid bytes from the log " + name + ".")
 -    segment.messageSet.truncateTo(validBytes)
 -  }
 -
 +  def numberOfSegments: Int = segments.size
 +  
    /**
 -   * The number of segments in the log
 +   * The number of truncates that have occurred since the log was opened.
     */
 -  def numberOfSegments: Int = segments.view.length
 +  def numberOfTruncates: Int = truncates.get
  
    /**
     * Close this log
@@@ -196,55 -255,76 +206,67 @@@
     * This method will generally be responsible for assigning offsets to the messages, 
     * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
     * 
 -   * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
 -   * or (-1,-1) if the message set is empty
 +   * @param messages The message set to append
 +   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
 +   * 
 +   * @throws KafkaStorageException If the append fails due to an I/O error.
 +   * 
 +   * @return Information about the appended messages including the first and last offset
     */
 -  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
 -    val messageSetInfo = analyzeAndValidateMessageSet(messages)
 +  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
 +    val appendInfo = analyzeAndValidateMessageSet(messages)
      
      // if we have any valid messages, append them to the log
 -    if(messageSetInfo.count == 0) {
 -      (-1L, -1L)
 -    } else {
 -      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
 -      BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
 -
 -      // trim any invalid bytes or partial messages before appending it to the on-disk log
 -      var validMessages = trimInvalidBytes(messages)
 +    if(appendInfo.count == 0)
 +      return appendInfo
 +      
 +    // trim any invalid bytes or partial messages before appending it to the on-disk log
 +    var validMessages = trimInvalidBytes(messages)
  
 -      try {
 -        // they are valid, insert them in the log
 -        val offsets = lock synchronized {
 -          // maybe roll the log if this segment is full
 -          val segment = maybeRoll(segments.view.last)
 +    try {
 +      // they are valid, insert them in the log
 +      lock synchronized {
 +        // maybe roll the log if this segment is full
 +        val segment = maybeRoll()
            
 +        if(assignOffsets) {
            // assign offsets to the messageset
 -          val offsets = 
 -            if(assignOffsets) {
 -              val offsetCounter = new AtomicLong(nextOffset.get)
 -              val firstOffset = offsetCounter.get
 -              try {
 -                validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
 -              } catch {
 -                case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
 -              }
 -              val lastOffset = offsetCounter.get - 1
 -              (firstOffset, lastOffset)
 -            } else {
 -              require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)
 -              require(messageSetInfo.firstOffset >= nextOffset.get, 
 -                      "Attempt to append a message set beginning with offset %d to a log with log end offset %d."
 -                      .format(messageSetInfo.firstOffset, nextOffset.get))
 -              (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
 -            }
 -
 -          // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
 -          // happens with the new message size (after re-compression, if any)
 -          for(messageAndOffset <- validMessages.shallowIterator) {
 -            if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize)
 -              throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
 -                .format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize))
 +          appendInfo.firstOffset = nextOffset.get
 +          val offset = new AtomicLong(nextOffset.get)
-           validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
++          try {
++            validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
++          } catch {
++            case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
+           }
 +          appendInfo.lastOffset = offset.get - 1
 +        } else {
 +          // we are taking the offsets we are given
 +          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
 +            throw new IllegalArgumentException("Out of order offsets found in " + messages)
 +        }
  
 -          // now append to the log
 -          trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
 -                .format(this.name, offsets._1, nextOffset.get(), validMessages))
 -          segment.append(offsets._1, validMessages)
 -          
 -          // advance the log end offset
 -          nextOffset.set(offsets._2 + 1)
 -          
 -          // return the offset at which the messages were appended
 -          offsets
++        // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison
++        // happens with the new message size (after re-compression, if any)
++        for(messageAndOffset <- validMessages.shallowIterator) {
++          if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize)
++            throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
++              .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
+         }
++
 +        // now append to the log
 +        trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
 +        segment.append(appendInfo.firstOffset, validMessages)
          
 +        // increment the log end offset
 +        nextOffset.set(appendInfo.lastOffset + 1)
 +          
          // maybe flush the log and index
 -        maybeFlush(messageSetInfo.count)
 +        maybeFlush(appendInfo.count)
          
 -        // return the first and last offset
 -        offsets
 -      } catch {
 -        case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
 +        appendInfo
        }
 +    } catch {
 +      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
      }
    }
    
@@@ -259,21 -333,16 +281,20 @@@
    
    /**
     * Validate the following:
 -   * 1. each message matches its CRC
 +   * <ol>
-    * <li> each message is not too large
 +   * <li> each message matches its CRC
 +   * </ol>
     * 
     * Also compute the following quantities:
 -   * 1. First offset in the message set
 -   * 2. Last offset in the message set
 -   * 3. Number of messages
 -   * 4. Whether the offsets are monotonically increasing
 -   * 5. Whether any compression codec is used (if many are used, then the last one is given)
 +   * <ol>
 +   * <li> First offset in the message set
 +   * <li> Last offset in the message set
 +   * <li> Number of messages
 +   * <li> Whether the offsets are monotonically increasing
 +   * <li> Whether any compression codec is used (if many are used, then the last one is given)
 +   * </ol>
     */
 -  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = {
 +  private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
      var messageCount = 0
      var firstOffset, lastOffset = -1L
      var codec: CompressionCodec = NoCompressionCodec
@@@ -288,12 -357,10 +309,9 @@@
        // update the last offset seen
        lastOffset = messageAndOffset.offset
  
-       // check the validity of the message by checking CRC and message size
+       // check the validity of the message by checking CRC
        val m = messageAndOffset.message
        m.ensureValid()
-       if(MessageSet.entrySize(m) > config.maxMessageSize)
-         throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), config.maxMessageSize))
-       
 -
        messageCount += 1;
        
        val messageCodec = m.compressionCodec

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/OffsetIndex.scala
index 23e659f,e806da9..eff213e
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@@ -90,9 -90,12 +90,9 @@@ class OffsetIndex(@volatile var file: F
    /* the last offset in the index */
    var lastOffset = readLastOffset()
    
-   info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
-        .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+   info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
+     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
  
 -  /* the maximum number of entries this index can hold */
 -  def maxEntries = mmap.limit / 8
 -
    /**
     * The last offset written to the index
     */
@@@ -132,14 -126,10 +132,14 @@@
    /**
     * Find the slot in which the largest offset less than or equal to the given
     * target offset is stored.
 -   * Return -1 if the least entry in the index is larger than the target offset or the index is empty
 +   * 
 +   * @param idx The index buffer
 +   * @param targetOffset The offset to look for
 +   * 
 +   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
     */
    private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
-     // we only store the difference from the baseoffset so calculate that
+     // we only store the difference from the base offset so calculate that
      val relOffset = targetOffset - baseOffset
      
      // check if the index is empty
@@@ -207,9 -194,9 +206,9 @@@
    def isFull: Boolean = entries >= this.maxEntries
    
    /**
 -   * Truncate the entire index
 +   * Truncate the entire index, deleting all entries
     */
-   def truncate() = truncateTo(this.baseOffset)
+   def truncate() = truncateToEntries(0)
    
    /**
     * Remove all entries from the index which have an offset greater than or equal to the given offset.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index 5c5dbc9,cfabfc1..f30dca1
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -21,11 -21,8 +21,10 @@@ import kafka.admin.{CreateTopicCommand
  import kafka.api._
  import kafka.message._
  import kafka.network._
 +import kafka.log._
 +import kafka.utils.ZKGroupTopicDirs
  import org.apache.log4j.Logger
  import scala.collection._
- import kafka.network.RequestChannel.Response
  import java.util.concurrent.TimeUnit
  import java.util.concurrent.atomic._
  import kafka.metrics.KafkaMetricsGroup
@@@ -190,12 -190,7 +195,12 @@@ class KafkaApis(val requestChannel: Req
        try {
          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
          val log = localReplica.log.get
 -        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
 +        val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
 +        
 +        // update stats
 +        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count)
-         BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(info.count)
++        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(info.count)
 +        
          // we may need to increment high watermark since ISR could be down to 1
          localReplica.partition.maybeIncrementLeaderHW(localReplica)
          trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 710c08b,f7fe0de..573601f
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -50,7 -48,8 +50,8 @@@ class ReplicaManager(val config: KafkaC
    val replicaFetcherManager = new ReplicaFetcherManager(config, this)
    this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
    private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
 -  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
 +  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+   private var hwThreadInitialized = false
  
    newGauge(
      "LeaderCount",

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 334c9c1,f3a5095..2804908
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@@ -39,24 -40,23 +40,24 @@@ import java.util.concurrent.atomic.Atom
  
  
  /**
-  * The kafka 07 to 08 online migration tool, it's used for migrating data from 07 to 08 cluster. Internally,
-  * it's composed of a kafka 07 consumer and kafka 08 producer. The kafka 07 consumer consumes data from the
-  * 07 cluster, and the kafka 08 producer produces data to the 08 cluster.
+  * This is a  kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
+  * it's composed of a kafka 0.7 consumer and kafka 0.8 producer. The kafka 0.7 consumer consumes data from the
+  * 0.7 cluster, and the kafka 0.8 producer produces data to the 0.8 cluster.
   *
-  * The 07 consumer is loaded from kafka 07 jar using a "parent last, child first" java class loader.
-  * Ordinary class loader is "parent first, child last", and kafka 08 and 07 both have classes for a lot of
-  * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 07 jar will
-  * will still load the 08 version class.
+  * The 0.7 consumer is loaded from kafka 0.7 jar using a "parent last, child first" java class loader.
+  * Ordinary class loader is "parent first, child last", and kafka 0.8 and 0.7 both have classes for a lot of
+  * class names like "kafka.consumer.Consumer", etc., so ordinary java URLClassLoader with kafka 0.7 jar will
+  * will still load the 0.8 version class.
   *
-  * As kafka 07 and kafka 08 used different version of zkClient, the zkClient jar used by kafka 07 should
+  * As kafka 0.7 and kafka 0.8 used different version of zkClient, the zkClient jar used by kafka 0.7 should
   * also be used by the class loader.
   *
-  * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
-  * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
+  * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer,
+  * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
   */
 -
 -public class KafkaMigrationTool {
 +@SuppressWarnings({"unchecked", "rawtypes"})
 +public class KafkaMigrationTool
 +{
    private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/utils/Utils.scala
index 1c88226,fe4c925..37b3975
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@@ -528,38 -573,4 +552,38 @@@ object Utils extends Logging 
     * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
     */
    def abs(n: Int) = n & 0x7fffffff
-   
 -}
++
 +  /**
 +   * Replace the given string suffix with the new suffix. If the string doesn't end with the given suffix throw an exception.
 +   */
 +  def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
 +    if(!s.endsWith(oldSuffix))
 +      throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
 +    s.substring(0, s.length - oldSuffix.length) + newSuffix
 +  }
 +
 +  /**
 +   * Create a file with the given path
 +   * @param path The path to create
 +   * @throw KafkaStorageException If the file create fails
 +   * @return The created file
 +   */
 +  def createFile(path: String): File = {
 +    val f = new File(path)
 +    val created = f.createNewFile()
 +    if(!created)
 +      throw new KafkaStorageException("Failed to create file %s.".format(path))
 +    f
 +  }
 +  
 +  /**
 +   * Read a big-endian integer from a byte array
 +   */
 +  def readInt(bytes: Array[Byte], offset: Int): Int = {
 +    ((bytes(offset) & 0xFF) << 24) |
 +    ((bytes(offset + 1) & 0xFF) << 16) |
 +    ((bytes(offset + 2) & 0xFF) << 8) |
 +    (bytes(offset + 3) & 0xFF)
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 93a86e0,922a200..65a67e8
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@@ -394,10 -392,10 +392,11 @@@ class AsyncProducerTest extends JUnit3S
    @Test
    def testFailedSendRetryLogic() {
      val props = new Properties()
+     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+     props.put("request.required.acks", "1")
      props.put("serializer.class", classOf[StringEncoder].getName.toString)
      props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
-     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 +    props.put("producer.num.retries", 3.toString)
  
      val config = new ProducerConfig(props)
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 48487e8,db46247..7430485
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@@ -46,11 -44,11 +46,11 @@@ class LogRecoveryTest extends JUnit3Sui
    val message = "hello"
  
    var producer: Producer[Int, String] = null
 -  var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
 -  var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
 +  var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
 +  var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
    var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
    
-   val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
+   val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs))
    producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
    producerProps.put("request.required.acks", "-1")
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f2742d6/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 48dd335,0b6244f..f7f734f
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@@ -19,12 -19,12 +19,13 @@@ package kafka.util
  
  import java.util.Arrays
  import java.nio.ByteBuffer
 +import java.io._
  import org.apache.log4j.Logger
  import org.scalatest.junit.JUnitSuite
- import org.junit.Test
  import org.junit.Assert._
  import kafka.common.KafkaException
+ import org.junit.{Test}
+ import kafka.tools.KafkaMigrationTool
  
  
  class UtilsTest extends JUnitSuite {