You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/16 00:43:41 UTC

git commit: KAFKA-698 Avoid advancing the log end offset until the append has actually happened since reads may be happening in the meantime.

Updated Branches:
  refs/heads/0.8 3696d7281 -> de1a4d727


KAFKA-698 Avoid advancing the log end offset until the append has actually happened since reads may be happening in the meantime.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de1a4d72
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de1a4d72
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de1a4d72

Branch: refs/heads/0.8
Commit: de1a4d727693c39be19fd9db427746fa6c8a4a12
Parents: 3696d72
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Jan 14 09:29:52 2013 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Jan 15 15:43:10 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala |   11 +++++++----
 1 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/de1a4d72/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 79db610..560be19 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -269,14 +269,14 @@ private[kafka] class Log(val dir: File,
           // assign offsets to the messageset
           val offsets = 
             if(assignOffsets) {
-              val firstOffset = nextOffset.get
-              validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec)
-              val lastOffset = nextOffset.get - 1
+              val offsetCounter = new AtomicLong(nextOffset.get)
+              val firstOffset = offsetCounter.get
+              validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec)
+              val lastOffset = offsetCounter.get - 1
               (firstOffset, lastOffset)
             } else {
               if(!messageSetInfo.offsetsMonotonic)
                 throw new IllegalArgumentException("Out of order offsets found in " + messages)
-              nextOffset.set(messageSetInfo.lastOffset + 1)
               (messageSetInfo.firstOffset, messageSetInfo.lastOffset)
             }
           
@@ -285,6 +285,9 @@ private[kafka] class Log(val dir: File,
                 .format(this.name, offsets._1, nextOffset.get(), validMessages))
           segment.append(offsets._1, validMessages)
           
+          // advance the log end offset
+          nextOffset.set(offsets._2 + 1)
+          
           // return the offset at which the messages were appended
           offsets
         }