You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/02 21:50:03 UTC

svn commit: r1416253 [2/2] - in /kafka/trunk: core/src/main/scala/kafka/log/ core/src/main/scala/kafka/message/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/server/ core/src/main/scala/kafka/tools/ core/src/main/scala/kafka/utils/ core...

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala Sun Dec  2 20:50:01 2012
@@ -35,13 +35,21 @@ class FileMessageSetTest extends BaseMes
     set
   }
 
+  /**
+   * Test that the cached size variable matches the actual file size as we append messages
+   */
   @Test
   def testFileSize() {
     assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
-    messageSet.append(singleMessageSet("abcd".getBytes()))
-    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+    for(i <- 0 until 20) {
+      messageSet.append(singleMessageSet("abcd".getBytes))
+      assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+    } 
   }
   
+  /**
+   * Test that adding invalid bytes to the end of the log doesn't break iteration
+   */
   @Test
   def testIterationOverPartialAndTruncation() {
     testPartialWrite(0, messageSet)
@@ -62,6 +70,9 @@ class FileMessageSetTest extends BaseMes
     checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
   }
   
+  /**
+   * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
+   */
   @Test
   def testIterationDoesntChangePosition() {
     val position = messageSet.channel.position
@@ -69,39 +80,71 @@ class FileMessageSetTest extends BaseMes
     assertEquals(position, messageSet.channel.position)
   }
   
+  /**
+   * Test a simple append and read.
+   */
   @Test
   def testRead() {
-    val read = messageSet.read(0, messageSet.sizeInBytes)
+    var read = messageSet.read(0, messageSet.sizeInBytes)
     checkEquals(messageSet.iterator, read.iterator)
     val items = read.iterator.toList
     val sec = items.tail.head
-    val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes)
-    checkEquals(items.tail.iterator, read2.iterator)
+    read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes)
+    assertEquals("Try a read starting from the second message", items.tail, read.toList)
+    read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message))
+    assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList)
   }
   
+  /**
+   * Test the MessageSet.searchFor API.
+   */
   @Test
   def testSearch() {
     // append a new message with a high offset
     val lastMessage = new Message("test".getBytes)
     messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage))
-    var physicalOffset = 0
+    var position = 0
     assertEquals("Should be able to find the first message by its offset", 
-                 OffsetPosition(0L, physicalOffset), 
+                 OffsetPosition(0L, position), 
                  messageSet.searchFor(0, 0))
-    physicalOffset += MessageSet.entrySize(messageSet.head.message)
+    position += MessageSet.entrySize(messageSet.head.message)
     assertEquals("Should be able to find second message when starting from 0", 
-                 OffsetPosition(1L, physicalOffset), 
+                 OffsetPosition(1L, position), 
                  messageSet.searchFor(1, 0))
     assertEquals("Should be able to find second message starting from its offset", 
-                 OffsetPosition(1L, physicalOffset), 
-                 messageSet.searchFor(1, physicalOffset))
-    physicalOffset += MessageSet.entrySize(messageSet.tail.head.message)
-    assertEquals("Should be able to find third message from a non-existant offset", 
-                 OffsetPosition(50L, physicalOffset), 
-                 messageSet.searchFor(3, physicalOffset))
-    assertEquals("Should be able to find third message by correct offset", 
-                 OffsetPosition(50L, physicalOffset), 
-                 messageSet.searchFor(50, physicalOffset))
+                 OffsetPosition(1L, position), 
+                 messageSet.searchFor(1, position))
+    position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message)
+    assertEquals("Should be able to find fourth message from a non-existant offset", 
+                 OffsetPosition(50L, position), 
+                 messageSet.searchFor(3, position))
+    assertEquals("Should be able to find fourth message by correct offset", 
+                 OffsetPosition(50L, position), 
+                 messageSet.searchFor(50,  position))
+  }
+  
+  /**
+   * Test that the message set iterator obeys start and end slicing
+   */
+  @Test
+  def testIteratorWithLimits() {
+    val message = messageSet.toList(1)
+    val start = messageSet.searchFor(1, 0).position
+    val size = message.message.size
+    val slice = messageSet.read(start, size)
+    assertEquals(List(message), slice.toList)
+  }
+  
+  /**
+   * Test the truncateTo method lops off messages and appropriately updates the size
+   */
+  @Test
+  def testTruncate() {
+    val message = messageSet.toList(0)
+    val end = messageSet.searchFor(1, 0).position
+    messageSet.truncateTo(end)
+    assertEquals(List(message), messageSet.toList)
+    assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
   }
   
 }

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sun Dec  2 20:50:01 2012
@@ -59,6 +59,9 @@ class LogManagerTest extends JUnit3Suite
     super.tearDown()
   }
   
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
+   */
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(name, 0)
@@ -67,29 +70,34 @@ class LogManagerTest extends JUnit3Suite
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
 
+  /**
+   * Test that get on a non-existent returns None and no log is created.
+   */
   @Test
-  def testGetLog() {
+  def testGetNonExistentLog() {
     val log = logManager.getLog(name, 0)
+    assertEquals("No log should be found.", None, log)
     val logFile = new File(config.logDirs(0), name + "-0")
     assertTrue(!logFile.exists)
   }
 
+  /**
+   * Test time-based log cleanup. First append messages, then set the time into the future and run cleanup.
+   */
   @Test
   def testCleanupExpiredSegments() {
     val log = logManager.getOrCreateLog(name, 0)
     var offset = 0L
     for(i <- 0 until 1000) {
       var set = TestUtils.singleMessageSet("test".getBytes())
-      val (start, end) = log.append(set)
-      offset = end
+      val info = log.append(set)
+      offset = info.lastOffset
     }
-    log.flush
 
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
 
     // update the last modified time of all log segments
-    val logSegments = log.segments.view
-    logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs))
+    log.logSegments.foreach(_.log.file.setLastModified(time.currentMs))
 
     time.currentMs += maxLogAgeHours*60*60*1000 + 1
     logManager.cleanupLogs()
@@ -106,6 +114,9 @@ class LogManagerTest extends JUnit3Suite
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
 
+  /**
+   * Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted.
+   */
   @Test
   def testCleanupSegmentsToMaintainSize() {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
@@ -130,11 +141,9 @@ class LogManagerTest extends JUnit3Suite
     // add a bunch of messages that should be larger than the retentionSize
     for(i <- 0 until 1000) {
       val set = TestUtils.singleMessageSet("test".getBytes())
-      val (start, end) = log.append(set)
-      offset = start
+      val info = log.append(set)
+      offset = info.firstOffset
     }
-    // flush to make sure it's written to disk
-    log.flush
 
     // should be exactly 100 full segments + 1 new empty one
     assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
@@ -153,29 +162,33 @@ class LogManagerTest extends JUnit3Suite
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
 
+  /**
+   * Test that flush is invoked by the background scheduler thread.
+   */
   @Test
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-                   override val logFileSize = 1024 *1024 *1024
-                   override val flushSchedulerThreadRate = 50
+                   override val flushSchedulerThreadRate = 5
+                   override val defaultFlushIntervalMs = 5
                    override val flushInterval = Int.MaxValue
-                   override val logRollHours = maxRollInterval
-                   override val flushIntervalMap = Map("timebasedflush" -> 100)
                  }
-    logManager = new LogManager(config, scheduler, time)
+    logManager = new LogManager(config, scheduler, SystemTime)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
+    val lastFlush = log.lastFlushTime
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
-    val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
-    assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
-                     ellapsed < 2*config.flushSchedulerThreadRate)
+    Thread.sleep(config.flushSchedulerThreadRate)
+    assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
   }
   
+  /**
+   * Test that new logs that are created are assigned to the least loaded log directory
+   */
   @Test
   def testLeastLoadedAssignment() {
     // create a log manager with multiple data directories
@@ -196,6 +209,9 @@ class LogManagerTest extends JUnit3Suite
     }
   }
   
+  /**
+   * Test that it is not possible to open two log managers using the same data directory
+   */
   def testTwoLogManagersUsingSameDirFails() {
     try {
       new LogManager(logManager.config, scheduler, time)

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Sun Dec  2 20:50:01 2012
@@ -2,6 +2,9 @@ package kafka.log
 
 import junit.framework.Assert._
 import java.util.concurrent.atomic._
+import java.io.File
+import java.io.RandomAccessFile
+import java.util.Random
 import org.junit.{Test, After}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
@@ -13,6 +16,7 @@ class LogSegmentTest extends JUnit3Suite
   
   val segments = mutable.ArrayBuffer[LogSegment]()
   
+  /* create a segment with the given base offset */
   def createSegment(offset: Long): LogSegment = {
     val msFile = TestUtils.tempFile()
     val ms = new FileMessageSet(msFile)
@@ -24,6 +28,7 @@ class LogSegmentTest extends JUnit3Suite
     seg
   }
   
+  /* create a ByteBufferMessageSet for the given messages starting from the given offset */
   def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
                              offsetCounter = new AtomicLong(offset), 
@@ -34,17 +39,24 @@ class LogSegmentTest extends JUnit3Suite
   def teardown() {
     for(seg <- segments) {
       seg.index.delete()
-      seg.messageSet.delete()
+      seg.log.delete()
     }
   }
   
+  /**
+   * A read on an empty log segment should return null
+   */
   @Test
   def testReadOnEmptySegment() {
     val seg = createSegment(40)
     val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
-    assertEquals(0, read.size)
+    assertNull("Read beyond the last offset in the segment should be null", read)
   }
   
+  /**
+   * Reading from before the first offset in the segment should return messages
+   * beginning with the first message in the segment
+   */
   @Test
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
@@ -54,24 +66,40 @@ class LogSegmentTest extends JUnit3Suite
     assertEquals(ms.toList, read.toList)
   }
   
-  @Test
-  def testReadSingleMessage() {
-    val seg = createSegment(40)
-    val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
-    val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50))
-    assertEquals(new Message("hello".getBytes), read.head.message)
-  }
-  
+  /**
+   * If we set the startOffset and maxOffset for the read to be the same value
+   * we should get only the first message in the log
+   */
+  @Test
+  def testMaxOffset() {
+    val baseOffset = 50
+    val seg = createSegment(baseOffset)
+    val ms = messages(baseOffset, "hello", "there", "beautiful")
+    seg.append(baseOffset, ms)
+    def validate(offset: Long) = 
+      assertEquals(ms.filter(_.offset == offset).toList, 
+                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList)
+    validate(50)
+    validate(51)
+    validate(52)
+  }
+  
+  /**
+   * If we read from an offset beyond the last offset in the segment we should get null
+   */
   @Test
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there")
     seg.append(50, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
-    assertEquals(0, read.size)
+    assertNull("Read beyond the last offset in the segment should give null", null)
   }
   
+  /**
+   * If we read from an offset which doesn't exist we should get a message set beginning
+   * with the least offset greater than the given startOffset.
+   */
   @Test
   def testReadFromGap() {
     val seg = createSegment(40)
@@ -83,6 +111,10 @@ class LogSegmentTest extends JUnit3Suite
     assertEquals(ms2.toList, read.toList)
   }
   
+  /**
+   * In a loop append two messages then truncate off the second of those messages and check that we can read
+   * the first but not the second message.
+   */
   @Test
   def testTruncate() {
     val seg = createSegment(40)
@@ -93,26 +125,33 @@ class LogSegmentTest extends JUnit3Suite
       val ms2 = messages(offset+1, "hello")
       seg.append(offset+1, ms2)
       // check that we can read back both messages
-      val read = seg.read(offset, 10000, None)
+      val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.head, ms2.head), read.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
-      val read2 = seg.read(offset, 10000, None)
+      val read2 = seg.read(offset, None, 10000)
       assertEquals(1, read2.size)
       assertEquals(ms1.head, read2.head)
       offset += 1
     }
   }
   
+  /**
+   * Test truncating the whole segment, and check that we can reappend with the original offset.
+   */
   @Test
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
     seg.append(40, messages(40, "hello", "there"))
     seg.truncateTo(0)
+    assertNull("Segment should be empty.", seg.read(0, None, 1024))
     seg.append(40, messages(40, "hello", "there"))    
   }
   
+  /**
+   * Test that offsets are assigned sequentially and that the nextOffset variable is incremented
+   */
   @Test
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
@@ -121,4 +160,50 @@ class LogSegmentTest extends JUnit3Suite
     assertEquals(53, seg.nextOffset())
   }
   
+  /**
+   * Create a segment with some data and an index. Then corrupt the index,
+   * and recover the segment, the entries should all be readable.
+   */
+  @Test
+  def testRecoveryFixesCorruptIndex() {
+    val seg = createSegment(0)
+    for(i <- 0 until 100)
+      seg.append(i, messages(i, i.toString))
+    val indexFile = seg.index.file
+    writeNonsense(indexFile, 5, indexFile.length.toInt)
+    seg.recover(64*1024)
+    for(i <- 0 until 100)
+      assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
+  }
+  
+  /**
+   * Randomly corrupt a log a number of times and attempt recovery.
+   */
+  @Test
+  def testRecoveryWithCorruptMessage() {
+    val rand = new Random(1)
+    val messagesAppended = 20
+    for(iteration <- 0 until 10) {
+      val seg = createSegment(0)
+      for(i <- 0 until messagesAppended)
+        seg.append(i, messages(i, i.toString))
+      val offsetToBeginCorruption = rand.nextInt(messagesAppended)
+      // start corrupting somewhere in the middle of the chosen record all the way to the end
+      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
+      writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position)
+      seg.recover(64*1024)
+      assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
+      seg.delete()
+    }
+  }
+  
+  def writeNonsense(fileName: File, position: Long, size: Int) {
+    val file = new RandomAccessFile(fileName, "rw")
+    file.seek(position)
+    val rand = new Random
+    for(i <- 0 until size)
+      file.writeByte(rand.nextInt(255))
+    file.close()
+  }
+  
 }
\ No newline at end of file

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Sun Dec  2 20:50:01 2012
@@ -54,7 +54,10 @@ class LogTest extends JUnitSuite {
     }
   }
 
-  /** Test that the size and time based log segment rollout works. */
+  /**
+   * Tests for time based log roll. This test appends messages then changes the time
+   * using the mock clock to force the log to roll and checks the number of segments.
+   */
   @Test
   def testTimeBasedLogRoll() {
     val set = TestUtils.singleMessageSet("test".getBytes())
@@ -70,27 +73,26 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
 
     log.append(set)
-    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
-
-    // segment expires in age
-    time.currentMs += rollMs + 1
-    log.append(set)
-    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+    assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
 
-    time.currentMs += rollMs + 1
-    val blank = Array[Message]()
-    log.append(new ByteBufferMessageSet(new Message("blah".getBytes)))
-    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+    for(numSegments <- 2 until 4) {
+      time.currentMs += rollMs + 1
+      log.append(set)
+      assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
+    }
 
+    val numSegments = log.numberOfSegments
     time.currentMs += rollMs + 1
-    // the last segment expired in age, but was blank. So new segment should not be generated
     log.append(new ByteBufferMessageSet())
-    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+    assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
   }
 
+  /**
+   * Test that appending more than the maximum segment size rolls the log
+   */
   @Test
   def testSizeBasedLogRoll() {
-    val set = TestUtils.singleMessageSet("test".getBytes())
+    val set = TestUtils.singleMessageSet("test".getBytes)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@@ -106,28 +108,80 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
   }
 
+  /**
+   * Test that we can open and append to an empty log
+   */
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    log.append(TestUtils.singleMessageSet("test".getBytes))
   }
 
+  /**
+   * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
+   */
   @Test
-  def testAppendAndRead() {
-    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-    val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 10)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
-    log.flush()
-    val messages = log.read(0, 1024)
-    var current = 0
-    for(curr <- messages) {
-      assertEquals("Read message should equal written", message, curr.message)
-      current += 1
+  def testAppendAndReadWithSequentialOffsets() {
+    val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
+    
+    for(i <- 0 until messages.length)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
+    for(i <- 0 until messages.length) {
+      val read = log.read(i, 100, Some(i+1)).head
+      assertEquals("Offset read should match order appended.", i, read.offset)
+      assertEquals("Message should match appended.", messages(i), read.message)
     }
-    assertEquals(10, current)
+    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
   }
-
+  
+  /**
+   * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
+   * from any offset less than the logEndOffset including offsets not appended.
+   */
+  @Test
+  def testAppendAndReadWithNonSequentialOffsets() {
+    val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
+    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+    
+    // now test the case that we give the offsets and use non-sequential offsets
+    for(i <- 0 until messages.length)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
+    for(i <- 50 until messageIds.max) {
+      val idx = messageIds.indexWhere(_ >= i)
+      val read = log.read(i, 100, None).head
+      assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
+      assertEquals("Message should match appended.", messages(idx), read.message)
+    }
+  }
+  
+  /**
+   * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
+   * Specifically we create a log where the last message in the first segment has offset 0. If we
+   * then read offset 1, we should expect this read to come from the second segment, even though the 
+   * first segment has the greatest lower bound on the offset.
+   */
+  @Test
+  def testReadAtLogGap() {
+    val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+    
+    // keep appending until we have two segments with only a single message in the second segment
+    while(log.numberOfSegments == 1)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) 
+    
+    // now manually truncate off all but one message from the first segment to create a gap in the messages
+    log.logSegments.head.truncateTo(1)
+    
+    assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
+  }
+  
+  /**
+   * Test reading at the boundary of the log, specifically
+   * - reading from the logEndOffset should give an empty message set
+   * - reading beyond the log end offset should throw an OffsetOutOfRangeException
+   */
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
@@ -147,14 +201,17 @@ class LogTest extends JUnitSuite {
     }
   }
 
-  /** Test that writing and reading beyond the log size boundary works */
+  /**
+   * Test that covers reads and writes on a multisegment log. This test appends a bunch of messages
+   * and then reads them all back and checks that the message read and offset matches what was appended.
+   */
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
     val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
-    val offsets = messageSets.map(log.append(_)._1)
+    val offsets = messageSets.map(log.append(_).firstOffset)
     log.flush
 
     /* do successive reads to ensure all our messages are there */
@@ -169,7 +226,9 @@ class LogTest extends JUnitSuite {
     assertEquals("Should be no more messages", 0, lastRead.size)
   }
   
-  /** Test the case where we have compressed batches of messages */
+  /**
+   * Test reads at offsets that fall within compressed message set boundaries.
+   */
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
@@ -187,66 +246,46 @@ class LogTest extends JUnitSuite {
     assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
     assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
   }
-
-  @Test
-  def testFindSegment() {
-    assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
-    assertEquals("Search in segment list just outside the range of the last segment should find last segment",
-                 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
-    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
-                 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
-    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
-                 None, Log.findRange(makeRanges(5, 9, 12), -1))
-    assertContains(makeRanges(5, 9, 12), 11)
-    assertContains(makeRanges(5), 4)
-    assertContains(makeRanges(5,8), 5)
-    assertContains(makeRanges(5,8), 6)
-  }
   
-  @Test
-  def testEdgeLogRollsStartingAtZero() {
-    // first test a log segment starting at 0
-    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-    val curOffset = log.logEndOffset
-    assertEquals(curOffset, 0)
-
-    // time goes by; the log file is deleted
-    log.markDeletedWhile(_ => true)
-
-    // we now have a new log; the starting offset of the new log should remain 0
-    assertEquals(curOffset, log.logEndOffset)
-    log.delete()
-  }
-
-  @Test
-  def testEdgeLogRollsStartingAtNonZero() {
-    // second test an empty log segment starting at non-zero
-    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-    val numMessages = 1
-    for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(i.toString.getBytes))
-    val curOffset = log.logEndOffset
-    
-    // time goes by; the log file is deleted
-    log.markDeletedWhile(_ => true)
-
-    // we now have a new log
-    assertEquals(curOffset, log.logEndOffset)
-
-    // time goes by; the log file (which is empty) is deleted again
-    val deletedSegments = log.markDeletedWhile(_ => true)
-
-    // we shouldn't delete the last empty log segment.
-    assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
-
-    // we now have a new log
-    assertEquals(curOffset, log.logEndOffset)
+  /**
+   * Test garbage collecting old segments
+   */
+  @Test
+  def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
+    for(messagesToAppend <- List(0, 1, 25)) {
+      logDir.mkdirs()
+      // first test a log segment starting at 0
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+      for(i <- 0 until messagesToAppend)
+        log.append(TestUtils.singleMessageSet(i.toString.getBytes))
+      
+      var currOffset = log.logEndOffset
+      assertEquals(currOffset, messagesToAppend)
+
+      // time goes by; the log file is deleted
+      log.deleteOldSegments(_ => true)
+
+      assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
+      assertEquals("We should still have one segment left", 1, log.numberOfSegments)
+      assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
+      assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
+      assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", 
+                   currOffset,
+                   log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
+      
+      // cleanup the log
+      log.delete()
+    }
   }
 
+  /**
+   * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the 
+   * setting and checking that an exception is thrown.
+   */
   @Test
   def testMessageSizeCheck() {
-    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
-    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
@@ -259,10 +298,13 @@ class LogTest extends JUnitSuite {
       log.append(second)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
-        case e:MessageSizeTooLargeException => // this is good
+        case e: MessageSizeTooLargeException => // this is good
     }
   }
   
+  /**
+   * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
+   */
   @Test
   def testLogRecoversToCorrectOffset() {
     val numMessages = 100
@@ -273,25 +315,53 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
-    val lastIndexOffset = log.segments.view.last.index.lastOffset
-    val numIndexEntries = log.segments.view.last.index.entries
+    val lastIndexOffset = log.activeSegment.index.lastOffset
+    val numIndexEntries = log.activeSegment.index.entries
     log.close()
     
     // test non-recovery case
     log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
     log.close()
     
-    // test 
+    // test recovery case
     log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
     assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+    log.close()
+  }
+  
+  /**
+   * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
+   */
+  @Test
+  def testIndexRebuild() {
+    // publish the messages and close the log
+    val numMessages = 200
+    var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+    val indexFiles = log.logSegments.map(_.index.file)
+    log.close()
+    
+    // delete all the index files
+    indexFiles.foreach(_.delete())
+    
+    // reopen the log
+    log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+    
+    assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
+    for(i <- 0 until numMessages)
+      assertEquals(i, log.read(i, 100, None).head.offset)
     log.close()
   }
 
+  /**
+   * Test the Log truncate operations
+   */
   @Test
   def testTruncateTo() {
     val set = TestUtils.singleMessageSet("test".getBytes())
@@ -329,7 +399,7 @@ class LogTest extends JUnitSuite {
     
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
     assertEquals("Should be back to original size", log.size, size)
-    log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
+    log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
     assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
     assertEquals("Should change log size", log.size, 0)
 
@@ -343,6 +413,9 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", log.size, 0)
   }
 
+  /**
+   * Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends
+   */
   @Test
   def testIndexResizingAtTruncation() {
     val set = TestUtils.singleMessageSet("test".getBytes())
@@ -357,48 +430,25 @@ class LogTest extends JUnitSuite {
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
-    assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries)
+    assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries)
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
-    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.segments.view(0).index.maxEntries)
+    assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
     for (i<- 1 to msgPerSeg)
       log.append(set)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
-
-  @Test
-  def testAppendWithoutOffsetAssignment() {
-    for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
-      logDir.mkdir()
-      var log = new Log(logDir, 
-                        maxLogFileSize = 64*1024, 
-                        maxMessageSize = config.maxMessageSize, 
-                        maxIndexSize = 1000, 
-                        indexIntervalBytes = 10000, 
-                        needsRecovery = true)
-      val messages = List("one", "two", "three", "four", "five", "six")
-      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
-                                        offsetCounter = new AtomicLong(5), 
-                                        messages = messages.map(s => new Message(s.getBytes)):_*)
-      val firstOffset = ms.shallowIterator.toList.head.offset
-      val lastOffset = ms.shallowIterator.toList.last.offset
-      val (first, last) = log.append(ms, assignOffsets = false)
-      assertEquals(last + 1, log.logEndOffset)
-      assertEquals(firstOffset, first)
-      assertEquals(lastOffset, last)
-      assertTrue(log.read(5, 64*1024).size > 0)
-      log.delete()
-    }
-  }
-
+  /**
+   * Verify that truncation works correctly after re-opening the log
+   */
   @Test
   def testReopenThenTruncate() {
     val set = TestUtils.singleMessageSet("test".getBytes())
 
     // create a log
     var log = new Log(logDir, 
-                      maxLogFileSize = set.sizeInBytes * 5, 
+                      maxSegmentSize = set.sizeInBytes * 5, 
                       maxMessageSize = config.maxMessageSize, 
                       maxIndexSize = 1000, 
                       indexIntervalBytes = 10000, 
@@ -409,7 +459,7 @@ class LogTest extends JUnitSuite {
       log.append(set)
     log.close()
     log = new Log(logDir, 
-                  maxLogFileSize = set.sizeInBytes * 5, 
+                  maxSegmentSize = set.sizeInBytes * 5, 
                   maxMessageSize = config.maxMessageSize, 
                   maxIndexSize = 1000, 
                   indexIntervalBytes = 10000, 
@@ -419,24 +469,4 @@ class LogTest extends JUnitSuite {
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
   }
   
-  def assertContains(ranges: Array[Range], offset: Long) = {
-    Log.findRange(ranges, offset) match {
-      case Some(range) => 
-        assertTrue(range + " does not contain " + offset, range.contains(offset))
-      case None => fail("No range found, but expected to find " + offset)
-    }
-  }
-  
-  class SimpleRange(val start: Long, val size: Long) extends Range
-  
-  def makeRanges(breaks: Int*): Array[Range] = {
-    val list = new ArrayList[Range]
-    var prior = 0
-    for(brk <- breaks) {
-      list.add(new SimpleRange(prior, brk - prior))
-      prior = brk
-    }
-    list.toArray(new Array[Range](list.size))
-  }
-  
 }

Modified: kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Sun Dec  2 20:50:01 2012
@@ -26,7 +26,7 @@ import org.junit.Test
 
 trait BaseMessageSetTestCases extends JUnitSuite {
   
-  val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+  val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes))
   
   def createMessageSet(messages: Seq[Message]): MessageSet
 

Modified: kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Sun Dec  2 20:50:01 2012
@@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Su
     props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+    props.put("producer.num.retries", 3.toString)
 
     val config = new ProducerConfig(props)
 

Modified: kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Sun Dec  2 20:50:01 2012
@@ -40,7 +40,7 @@ class SimpleFetchTest extends JUnit3Suit
   val partitionId = 0
 
   /**
-   * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has
+   * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has
    * one  partition with one follower replica on broker "1".  The leader replica on "0"
    * has HW of "5" and LEO of "20".  The follower on broker "1" has a local replica
    * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
@@ -62,6 +62,7 @@ class SimpleFetchTest extends JUnit3Suit
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
+    EasyMock.expect(log)
     EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
     EasyMock.replay(log)
 
@@ -102,33 +103,6 @@ class SimpleFetchTest extends JUnit3Suit
 
     // make sure the log only reads bytes between 0->HW (5)
     EasyMock.verify(log)
-
-    // Test offset request from non-replica
-    val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
-    val offsetRequest = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-    val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
-
-    EasyMock.reset(logManager)
-    EasyMock.reset(replicaManager)
-
-    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
-    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
-    EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
-
-    EasyMock.replay(replicaManager)
-    EasyMock.replay(logManager)
-
-    apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
-                                                        requestKey = 5,
-                                                        buffer = offsetRequestBB,
-                                                        startTimeMs = 1))
-    val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
-    val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
-    EasyMock.verify(replicaManager)
-    EasyMock.verify(logManager)
-    assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
-    assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
   }
 
   /**
@@ -203,34 +177,6 @@ class SimpleFetchTest extends JUnit3Suit
      * an offset of 15
      */
     EasyMock.verify(log)
-
-    // Test offset request from replica
-    val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
-    val offsetRequest = OffsetRequest(
-      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)),
-      replicaId = followerReplicaId)
-    val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
-
-    EasyMock.reset(logManager)
-    EasyMock.reset(replicaManager)
-
-    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
-    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
-    EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
-
-    EasyMock.replay(replicaManager)
-    EasyMock.replay(logManager)
-
-    apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
-                                                        requestKey = 5,
-                                                        buffer = offsetRequestBB,
-                                                        startTimeMs = 1))
-    val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
-    val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
-    EasyMock.verify(replicaManager)
-    EasyMock.verify(logManager)
-    assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
-    assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
   }
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,

Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sun Dec  2 20:50:01 2012
@@ -127,7 +127,6 @@ object TestUtils extends Logging {
     props.put("hostname", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
-    props.put("log.flush.interval", "1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
     props

Modified: kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Dec  2 20:50:01 2012
@@ -33,7 +33,8 @@ class EmbeddedZookeeper(val connectStrin
   factory.startup(zookeeper)
 
   def shutdown() {
-    factory.shutdown()
+    Utils.swallow(zookeeper.shutdown())
+    Utils.swallow(factory.shutdown())
     Utils.rm(logDir)
     Utils.rm(snapshotDir)
   }

Modified: kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sun Dec  2 20:50:01 2012
@@ -19,7 +19,7 @@ package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, TestZKUtils}
+import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -36,8 +36,8 @@ trait ZooKeeperTestHarness extends JUnit
 
   override def tearDown() {
     super.tearDown
-    zkClient.close()
-    zookeeper.shutdown()
+    Utils.swallow(zkClient.close())
+    Utils.swallow(zookeeper.shutdown())
   }
 
 }

Modified: kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java Sun Dec  2 20:50:01 2012
@@ -25,7 +25,6 @@ import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.Message;
 
 
 public class Consumer extends Thread