You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/13 19:51:04 UTC

git commit: KAFKA-802 Flush message interval is based on compressed message count; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 dd9676163 -> c5462864a


KAFKA-802 Flush message interval is based on compressed message count; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5462864
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5462864
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5462864

Branch: refs/heads/0.8
Commit: c5462864aab05b98158bcbe623123db083b8e136
Parents: dd96761
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Mar 13 11:50:59 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 13 11:50:59 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala            |   15 +++++++++------
 core/src/main/scala/kafka/server/KafkaConfig.scala |    2 +-
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c5462864/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 34c5376..f6ee475 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -260,7 +260,7 @@ private[kafka] class Log(val dir: File,
    */
   def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
     val messageSetInfo = analyzeAndValidateMessageSet(messages)
-    
+
     // if we have any valid messages, append them to the log
     if(messageSetInfo.count == 0) {
       (-1L, -1L)
@@ -270,7 +270,9 @@ private[kafka] class Log(val dir: File,
 
       try {
         // they are valid, insert them in the log
-        val offsets = lock synchronized {
+        val offsetsAndNumAppendedMessages = lock synchronized {
+          val firstOffset = nextOffset.get
+
           // maybe roll the log if this segment is full
           val segment = maybeRoll(segments.view.last)
           
@@ -312,16 +314,17 @@ private[kafka] class Log(val dir: File,
           
           // advance the log end offset
           nextOffset.set(offsets._2 + 1)
-          
+          val numAppendedMessages = (nextOffset.get - firstOffset).toInt
+
           // return the offset at which the messages were appended
-          offsets
+          (offsets._1, offsets._2, numAppendedMessages)
         }
         
         // maybe flush the log and index
-        maybeFlush(messageSetInfo.count)
+        maybeFlush(offsetsAndNumAppendedMessages._3)
         
         // return the first and last offset
-        offsets
+        (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2)
       } catch {
         case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5462864/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b8970c8..549b4b0 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -110,7 +110,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+  val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)