You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/17 19:18:46 UTC
[7/7] git commit: merge from 0.8 and resolve conflict in Log
Updated Branches:
refs/heads/trunk 362eba981 -> 9ee795ac5
merge from 0.8 and resolve conflict in Log
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ee795ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ee795ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ee795ac
Branch: refs/heads/trunk
Commit: 9ee795ac563c3ce4c4f03e022c7f951e065ad1ed
Parents: 362eba9 214a0af
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jan 17 10:17:30 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jan 17 10:17:30 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 36 +++++++++++---
.../kafka/consumer/ConsumerFetcherManager.scala | 8 +++-
.../kafka/consumer/ConsumerFetcherThread.scala | 7 +++-
.../scala/kafka/consumer/PartitionTopicInfo.scala | 9 +++-
.../consumer/ZookeeperConsumerConnector.scala | 31 ++-----------
.../main/scala/kafka/javaapi/FetchRequest.scala | 6 +--
core/src/main/scala/kafka/log/Log.scala | 11 +++--
.../main/scala/kafka/network/RequestChannel.scala | 2 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 21 ++++++---
.../scala/kafka/server/ReplicaFetcherThread.scala | 9 +++-
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 1 -
.../consumer/ZookeeperConsumerConnectorTest.scala | 1 -
.../scala/unit/kafka/integration/FetcherTest.scala | 1 -
13 files changed, 81 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index f6a32b6,560be19..5ea9489
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -170,52 -244,62 +170,53 @@@ class Log(val dir: File
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
- * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set,
- * or (-1,-1) if the message set is empty
+ * @param messages The message set to append
+ * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+ *
+ * @throws KafkaStorageException If the append fails due to an I/O error.
+ *
+ * @return Information about the appended messages including the first and last offset
*/
- def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
- val messageSetInfo = analyzeAndValidateMessageSet(messages)
+ def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
+ val appendInfo = analyzeAndValidateMessageSet(messages)
// if we have any valid messages, append them to the log
- if(messageSetInfo.count == 0) {
- (-1L, -1L)
- } else {
- BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(messageSetInfo.count)
-
- // trim any invalid bytes or partial messages before appending it to the on-disk log
- var validMessages = trimInvalidBytes(messages)
+ if(appendInfo.count == 0)
+ return appendInfo
+
+ // trim any invalid bytes or partial messages before appending it to the on-disk log
+ var validMessages = trimInvalidBytes(messages)
- try {
- // they are valid, insert them in the log
- val offsets = lock synchronized {
- // maybe roll the log if this segment is full
- val segment = maybeRoll(segments.view.last)
-
- // assign offsets to the messageset
- val offsets =
- if(assignOffsets) {
- val offsetCounter = new AtomicLong(nextOffset.get)
- val firstOffset = offsetCounter.get
- validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
- val lastOffset = offsetCounter.get - 1
- (firstOffset, lastOffset)
- } else {
- if(!messageSetInfo.offsetsMonotonic)
- throw new IllegalArgumentException("Out of order offsets found in " + messages)
- (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
- }
-
- // now append to the log
- trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
- .format(this.name, offsets._1, nextOffset.get(), validMessages))
- segment.append(offsets._1, validMessages)
-
- // advance the log end offset
- nextOffset.set(offsets._2 + 1)
+ try {
+ // they are valid, insert them in the log
+ lock synchronized {
+ // maybe roll the log if this segment is full
+ val segment = maybeRoll()
- // return the offset at which the messages were appended
- offsets
+ if(assignOffsets) {
+ // assign offsets to the messageset
+ appendInfo.firstOffset = nextOffset.get
- validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec)
- appendInfo.lastOffset = nextOffset.get - 1
++ val offsetCounter = new AtomicLong(nextOffset.get)
++ validMessages = validMessages.assignOffsets(offsetCounter, appendInfo.codec)
++ appendInfo.lastOffset = offsetCounter.get - 1
+ } else {
+ // we are taking the offsets we are given
+ if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
+ throw new IllegalArgumentException("Out of order offsets found in " + messages)
- nextOffset.set(appendInfo.lastOffset + 1)
}
-
-
++
+ // now append to the log
+ trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
+ segment.append(appendInfo.firstOffset, validMessages)
-
++ nextOffset.set(appendInfo.lastOffset + 1)
++
// maybe flush the log and index
- maybeFlush(messageSetInfo.count)
+ maybeFlush(appendInfo.count)
- // return the first and last offset
- offsets
- } catch {
- case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
+ appendInfo
}
+ } catch {
+ case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ee795ac/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6ba93ea,79b3fa3..5097c26
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@@ -64,9 -63,13 +63,13 @@@ class ReplicaFetcherThread(name:String
replicaId = brokerConfig.brokerId,
requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
)
- val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+ val offset = partitionErrorAndOffset.error match {
+ case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+ case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+ }
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
- replica.log.get.truncateAndStartWithNewOffset(offset)
+ replica.log.get.truncateFullyAndStartAt(offset)
offset
}