You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/17 19:18:46 UTC

[7/7] git commit: merge from 0.8 and resolve conflict in Log

Updated Branches:
  refs/heads/trunk 362eba981 -> 9ee795ac5


merge from 0.8 and resolve conflict in Log


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ee795ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ee795ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ee795ac

Branch: refs/heads/trunk
Commit: 9ee795ac563c3ce4c4f03e022c7f951e065ad1ed
Parents: 362eba9 214a0af
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jan 17 10:17:30 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jan 17 10:17:30 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |   36 +++++++++++---
 .../kafka/consumer/ConsumerFetcherManager.scala    |    8 +++-
 .../kafka/consumer/ConsumerFetcherThread.scala     |    7 +++-
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |    9 +++-
 .../consumer/ZookeeperConsumerConnector.scala      |   31 ++-----------
 .../main/scala/kafka/javaapi/FetchRequest.scala    |    6 +--
 core/src/main/scala/kafka/log/Log.scala            |   11 +++--
 .../main/scala/kafka/network/RequestChannel.scala  |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   21 ++++++---
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    9 +++-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    1 -
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    1 -
 .../scala/unit/kafka/integration/FetcherTest.scala |    1 -
 13 files changed, 81 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index f6a32b6,560be19..5ea9489
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -170,52 -244,62 +170,53 @@@ class Log(val dir: File
     * This method will generally be responsible for assigning offsets to the messages, 
     * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
     * 
 -   * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
 -   * or (-1,-1) if the message set is empty
 +   * @param messages The message set to append
 +   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
 +   * 
 +   * @throws KafkaStorageException If the append fails due to an I/O error.
 +   * 
 +   * @return Information about the appended messages including the first and last offset
     */
 -  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
 -    val messageSetInfo = analyzeAndValidateMessageSet(messages)
 +  def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
 +    val appendInfo = analyzeAndValidateMessageSet(messages)
      
      // if we have any valid messages, append them to the log
 -    if(messageSetInfo.count == 0) {
 -      (-1L, -1L)
 -    } else {
 -      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
 -      BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
 -
 -      // trim any invalid bytes or partial messages before appending it to the on-disk log
 -      var validMessages = trimInvalidBytes(messages)
 +    if(appendInfo.count == 0)
 +      return appendInfo
 +      
 +    // trim any invalid bytes or partial messages before appending it to the on-disk log
 +    var validMessages = trimInvalidBytes(messages)
  
 -      try {
 -        // they are valid, insert them in the log
 -        val offsets = lock synchronized {
 -          // maybe roll the log if this segment is full
 -          val segment = maybeRoll(segments.view.last)
 -          
 -          // assign offsets to the messageset
 -          val offsets = 
 -            if(assignOffsets) {
 -              val offsetCounter = new AtomicLong(nextOffset.get)
 -              val firstOffset = offsetCounter.get
 -              validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
 -              val lastOffset = offsetCounter.get - 1
 -              (firstOffset, lastOffset)
 -            } else {
 -              if(!messageSetInfo.offsetsMonotonic)
 -                throw new IllegalArgumentException("Out of order offsets found in " + messages)
 -              (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
 -            }
 -          
 -          // now append to the log
 -          trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
 -                .format(this.name, offsets._1, nextOffset.get(), validMessages))
 -          segment.append(offsets._1, validMessages)
 -          
 -          // advance the log end offset
 -          nextOffset.set(offsets._2 + 1)
 +    try {
 +      // they are valid, insert them in the log
 +      lock synchronized {
 +        // maybe roll the log if this segment is full
 +        val segment = maybeRoll()
            
 -          // return the offset at which the messages were appended
 -          offsets
 +        if(assignOffsets) {
 +           // assign offsets to the messageset
 +          appendInfo.firstOffset = nextOffset.get
-           validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec)
-           appendInfo.lastOffset = nextOffset.get - 1
++          val offsetCounter = new AtomicLong(nextOffset.get)
++          validMessages = validMessages.assignOffsets(offsetCounter, appendInfo.codec)
++          appendInfo.lastOffset = offsetCounter.get - 1
 +        } else {
 +          // we are taking the offsets we are given
 +          if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
 +              throw new IllegalArgumentException("Out of order offsets found in " + messages)
-           nextOffset.set(appendInfo.lastOffset + 1)
          }
-           
 -        
++
 +        // now append to the log
 +        trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
 +        segment.append(appendInfo.firstOffset, validMessages)
-           
++        nextOffset.set(appendInfo.lastOffset + 1)
++
          // maybe flush the log and index
 -        maybeFlush(messageSetInfo.count)
 +        maybeFlush(appendInfo.count)
          
 -        // return the first and last offset
 -        offsets
 -      } catch {
 -        case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
 +        appendInfo
        }
 +    } catch {
 +      case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
      }
    }
    

http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6ba93ea,79b3fa3..5097c26
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@@ -64,9 -63,13 +63,13 @@@ class ReplicaFetcherThread(name:String
        replicaId = brokerConfig.brokerId,
        requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
      )
-     val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+     val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+     val offset = partitionErrorAndOffset.error match {
+       case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+       case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+     }
      val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
 -    replica.log.get.truncateAndStartWithNewOffset(offset)
 +    replica.log.get.truncateFullyAndStartAt(offset)
      offset
    }