You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/23 23:09:37 UTC

git commit: KAFKA-727 broker can still expose uncommitted data to a consumer; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 22bde135a -> 311a5d81d


KAFKA-727 broker can still expose uncommitted data to a consumer; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/311a5d81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/311a5d81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/311a5d81

Branch: refs/heads/0.8
Commit: 311a5d81d8a8b34e18e1ad2fdc491661477de83c
Parents: 22bde13
Author: Edward Jay Kreps <jk...@apache.org>
Authored: Wed Jan 23 14:07:19 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 14:07:26 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala            |    2 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |    5 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  109 +++++++++++++++
 3 files changed, 113 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 560be19..5cd36e0 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -369,7 +369,7 @@ private[kafka] class Log(val dir: File,
    * Read a message set from the log. 
    * startOffset - The logical offset to begin reading at
    * maxLength - The maximum number of bytes to read
-   * maxOffset - The maximum logical offset to include in the resulting message set
+   * maxOffset - The first offset not included in the read
    */
   def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
     trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4417cff..2e40629 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -86,7 +86,8 @@ class LogSegment(val messageSet: FileMessageSet,
       throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
     if(maxSize == 0)
       return MessageSet.Empty
-      
+    
+    val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
     val startPosition = translateOffset(startOffset)
     
     // if the start position is already off the end of the log, return MessageSet.Empty
@@ -106,7 +107,7 @@ class LogSegment(val messageSet: FileMessageSet,
           val mapping = translateOffset(offset)
           val endPosition = 
             if(mapping == null)
-              messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
+              logSize // the max offset is off the end of the log, use the end of the file
             else
               mapping.position
           min(endPosition - startPosition.position, maxSize) 

http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
new file mode 100644
index 0000000..78e0548
--- /dev/null
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import java.util.concurrent.atomic._
+import kafka.common._
+import kafka.message._
+import kafka.log._
+import kafka.utils._
+
+/**
+ * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it 
+ * from another thread and checks a few basic assertions until the user kills the process.
+ */
+object StressTestLog {
+  val running = new AtomicBoolean(true)
+  
+  def main(args: Array[String]) {
+    val dir = TestUtils.tempDir()
+    val log = new Log(dir, 
+                      maxLogFileSize = 64*1024*1024, 
+                      maxMessageSize = Int.MaxValue, 
+                      flushInterval = Int.MaxValue, 
+                      rollIntervalMs = Long.MaxValue, 
+                      needsRecovery = false,
+                      maxIndexSize = 1024*1024,
+                      time = SystemTime,
+                      brokerId = 0)
+    val writer = new WriterThread(log)
+    writer.start()
+    val reader = new ReaderThread(log)
+    reader.start()
+    
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run() = {
+        running.set(false)
+        writer.join()
+        reader.join()
+        Utils.rm(dir)
+      }
+    })
+    
+    while(running.get) {
+      println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
+      Thread.sleep(1000)
+    }
+  }
+  
+  abstract class WorkerThread extends Thread {
+    override def run() {
+      try {
+        var offset = 0
+        while(running.get)
+          work()
+      } catch {
+        case e: Exception => 
+          e.printStackTrace()
+          running.set(false)
+      }
+      println(getClass.getName + " exiting...")
+    }
+    def work()
+  }
+  
+  class WriterThread(val log: Log) extends WorkerThread {
+    @volatile var offset = 0
+    override def work() {
+      val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
+      require(offsets._1 == offset && offsets._2 == offset)
+      offset += 1
+      if(offset % 1000 == 0)
+        Thread.sleep(500)
+    }
+  }
+  
+  class ReaderThread(val log: Log) extends WorkerThread {
+    @volatile var offset = 0
+    override def work() {
+      try {
+        log.read(offset, 1024, Some(offset+1)) match {
+          case read: FileMessageSet if read.sizeInBytes > 0 => {
+            val first = read.head
+            require(first.offset == offset, "We should either read nothing or the message we asked for.")
+            require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes))
+            offset += 1
+          }
+          case _ =>
+        }
+      } catch {
+        case e: OffsetOutOfRangeException => // this is okay
+      }
+    }
+  }
+}
\ No newline at end of file