You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/23 23:09:37 UTC
git commit: KAFKA-727 broker can still expose uncommitted data to a
consumer; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 22bde135a -> 311a5d81d
KAFKA-727 broker can still expose uncommitted data to a consumer; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/311a5d81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/311a5d81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/311a5d81
Branch: refs/heads/0.8
Commit: 311a5d81d8a8b34e18e1ad2fdc491661477de83c
Parents: 22bde13
Author: Edward Jay Kreps <jk...@apache.org>
Authored: Wed Jan 23 14:07:19 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 14:07:26 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogSegment.scala | 5 +-
.../src/test/scala/other/kafka/StressTestLog.scala | 109 +++++++++++++++
3 files changed, 113 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 560be19..5cd36e0 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -369,7 +369,7 @@ private[kafka] class Log(val dir: File,
* Read a message set from the log.
* startOffset - The logical offset to begin reading at
* maxLength - The maximum number of bytes to read
- * maxOffset - The maximum logical offset to include in the resulting message set
+ * maxOffset - The first offset not included in the read
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4417cff..2e40629 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -86,7 +86,8 @@ class LogSegment(val messageSet: FileMessageSet,
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
return MessageSet.Empty
-
+
+ val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return MessageSet.Empty
@@ -106,7 +107,7 @@ class LogSegment(val messageSet: FileMessageSet,
val mapping = translateOffset(offset)
val endPosition =
if(mapping == null)
- messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
+ logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/311a5d81/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
new file mode 100644
index 0000000..78e0548
--- /dev/null
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import java.util.concurrent.atomic._
+import kafka.common._
+import kafka.message._
+import kafka.log._
+import kafka.utils._
+
+/**
+ * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it
+ * from another thread and checks a few basic assertions until the user kills the process.
+ */
+object StressTestLog {
+ val running = new AtomicBoolean(true)
+
+ def main(args: Array[String]) {
+ val dir = TestUtils.tempDir()
+ val log = new Log(dir,
+ maxLogFileSize = 64*1024*1024,
+ maxMessageSize = Int.MaxValue,
+ flushInterval = Int.MaxValue,
+ rollIntervalMs = Long.MaxValue,
+ needsRecovery = false,
+ maxIndexSize = 1024*1024,
+ time = SystemTime,
+ brokerId = 0)
+ val writer = new WriterThread(log)
+ writer.start()
+ val reader = new ReaderThread(log)
+ reader.start()
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run() = {
+ running.set(false)
+ writer.join()
+ reader.join()
+ Utils.rm(dir)
+ }
+ })
+
+ while(running.get) {
+ println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
+ Thread.sleep(1000)
+ }
+ }
+
+ abstract class WorkerThread extends Thread {
+ override def run() {
+ try {
+ var offset = 0
+ while(running.get)
+ work()
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ running.set(false)
+ }
+ println(getClass.getName + " exiting...")
+ }
+ def work()
+ }
+
+ class WriterThread(val log: Log) extends WorkerThread {
+ @volatile var offset = 0
+ override def work() {
+ val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
+ require(offsets._1 == offset && offsets._2 == offset)
+ offset += 1
+ if(offset % 1000 == 0)
+ Thread.sleep(500)
+ }
+ }
+
+ class ReaderThread(val log: Log) extends WorkerThread {
+ @volatile var offset = 0
+ override def work() {
+ try {
+ log.read(offset, 1024, Some(offset+1)) match {
+ case read: FileMessageSet if read.sizeInBytes > 0 => {
+ val first = read.head
+ require(first.offset == offset, "We should either read nothing or the message we asked for.")
+ require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes))
+ offset += 1
+ }
+ case _ =>
+ }
+ } catch {
+ case e: OffsetOutOfRangeException => // this is okay
+ }
+ }
+ }
+}
\ No newline at end of file