You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/02 21:50:03 UTC
svn commit: r1416253 [2/2] - in /kafka/trunk: core/src/main/scala/kafka/log/
core/src/main/scala/kafka/message/ core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/server/ core/src/main/scala/kafka/tools/
core/src/main/scala/kafka/utils/ core...
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala Sun Dec 2 20:50:01 2012
@@ -35,13 +35,21 @@ class FileMessageSetTest extends BaseMes
set
}
+ /**
+ * Test that the cached size variable matches the actual file size as we append messages
+ */
@Test
def testFileSize() {
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
- messageSet.append(singleMessageSet("abcd".getBytes()))
- assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+ for(i <- 0 until 20) {
+ messageSet.append(singleMessageSet("abcd".getBytes))
+ assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+ }
}
+ /**
+ * Test that adding invalid bytes to the end of the log doesn't break iteration
+ */
@Test
def testIterationOverPartialAndTruncation() {
testPartialWrite(0, messageSet)
@@ -62,6 +70,9 @@ class FileMessageSetTest extends BaseMes
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
+ /**
+ * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
+ */
@Test
def testIterationDoesntChangePosition() {
val position = messageSet.channel.position
@@ -69,39 +80,71 @@ class FileMessageSetTest extends BaseMes
assertEquals(position, messageSet.channel.position)
}
+ /**
+ * Test a simple append and read.
+ */
@Test
def testRead() {
- val read = messageSet.read(0, messageSet.sizeInBytes)
+ var read = messageSet.read(0, messageSet.sizeInBytes)
checkEquals(messageSet.iterator, read.iterator)
val items = read.iterator.toList
val sec = items.tail.head
- val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes)
- checkEquals(items.tail.iterator, read2.iterator)
+ read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes)
+ assertEquals("Try a read starting from the second message", items.tail, read.toList)
+ read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message))
+ assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList)
}
+ /**
+ * Test the MessageSet.searchFor API.
+ */
@Test
def testSearch() {
// append a new message with a high offset
val lastMessage = new Message("test".getBytes)
messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage))
- var physicalOffset = 0
+ var position = 0
assertEquals("Should be able to find the first message by its offset",
- OffsetPosition(0L, physicalOffset),
+ OffsetPosition(0L, position),
messageSet.searchFor(0, 0))
- physicalOffset += MessageSet.entrySize(messageSet.head.message)
+ position += MessageSet.entrySize(messageSet.head.message)
assertEquals("Should be able to find second message when starting from 0",
- OffsetPosition(1L, physicalOffset),
+ OffsetPosition(1L, position),
messageSet.searchFor(1, 0))
assertEquals("Should be able to find second message starting from its offset",
- OffsetPosition(1L, physicalOffset),
- messageSet.searchFor(1, physicalOffset))
- physicalOffset += MessageSet.entrySize(messageSet.tail.head.message)
- assertEquals("Should be able to find third message from a non-existant offset",
- OffsetPosition(50L, physicalOffset),
- messageSet.searchFor(3, physicalOffset))
- assertEquals("Should be able to find third message by correct offset",
- OffsetPosition(50L, physicalOffset),
- messageSet.searchFor(50, physicalOffset))
+ OffsetPosition(1L, position),
+ messageSet.searchFor(1, position))
+ position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message)
+ assertEquals("Should be able to find fourth message from a non-existant offset",
+ OffsetPosition(50L, position),
+ messageSet.searchFor(3, position))
+ assertEquals("Should be able to find fourth message by correct offset",
+ OffsetPosition(50L, position),
+ messageSet.searchFor(50, position))
+ }
+
+ /**
+ * Test that the message set iterator obeys start and end slicing
+ */
+ @Test
+ def testIteratorWithLimits() {
+ val message = messageSet.toList(1)
+ val start = messageSet.searchFor(1, 0).position
+ val size = message.message.size
+ val slice = messageSet.read(start, size)
+ assertEquals(List(message), slice.toList)
+ }
+
+ /**
+ * Test the truncateTo method lops off messages and appropriately updates the size
+ */
+ @Test
+ def testTruncate() {
+ val message = messageSet.toList(0)
+ val end = messageSet.searchFor(1, 0).position
+ messageSet.truncateTo(end)
+ assertEquals(List(message), messageSet.toList)
+ assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
}
}
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Sun Dec 2 20:50:01 2012
@@ -59,6 +59,9 @@ class LogManagerTest extends JUnit3Suite
super.tearDown()
}
+ /**
+ * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
+ */
@Test
def testCreateLog() {
val log = logManager.getOrCreateLog(name, 0)
@@ -67,29 +70,34 @@ class LogManagerTest extends JUnit3Suite
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
+ /**
+ * Test that get on a non-existent returns None and no log is created.
+ */
@Test
- def testGetLog() {
+ def testGetNonExistentLog() {
val log = logManager.getLog(name, 0)
+ assertEquals("No log should be found.", None, log)
val logFile = new File(config.logDirs(0), name + "-0")
assertTrue(!logFile.exists)
}
+ /**
+ * Test time-based log cleanup. First append messages, then set the time into the future and run cleanup.
+ */
@Test
def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog(name, 0)
var offset = 0L
for(i <- 0 until 1000) {
var set = TestUtils.singleMessageSet("test".getBytes())
- val (start, end) = log.append(set)
- offset = end
+ val info = log.append(set)
+ offset = info.lastOffset
}
- log.flush
assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
// update the last modified time of all log segments
- val logSegments = log.segments.view
- logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs))
+ log.logSegments.foreach(_.log.file.setLastModified(time.currentMs))
time.currentMs += maxLogAgeHours*60*60*1000 + 1
logManager.cleanupLogs()
@@ -106,6 +114,9 @@ class LogManagerTest extends JUnit3Suite
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
+ /**
+ * Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted.
+ */
@Test
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
@@ -130,11 +141,9 @@ class LogManagerTest extends JUnit3Suite
// add a bunch of messages that should be larger than the retentionSize
for(i <- 0 until 1000) {
val set = TestUtils.singleMessageSet("test".getBytes())
- val (start, end) = log.append(set)
- offset = start
+ val info = log.append(set)
+ offset = info.firstOffset
}
- // flush to make sure it's written to disk
- log.flush
// should be exactly 100 full segments + 1 new empty one
assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
@@ -153,29 +162,33 @@ class LogManagerTest extends JUnit3Suite
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
+ /**
+ * Test that flush is invoked by the background scheduler thread.
+ */
@Test
def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val logFileSize = 1024 *1024 *1024
- override val flushSchedulerThreadRate = 50
+ override val flushSchedulerThreadRate = 5
+ override val defaultFlushIntervalMs = 5
override val flushInterval = Int.MaxValue
- override val logRollHours = maxRollInterval
- override val flushIntervalMap = Map("timebasedflush" -> 100)
}
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, scheduler, SystemTime)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
+ val lastFlush = log.lastFlushTime
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
- val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
- assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
- ellapsed < 2*config.flushSchedulerThreadRate)
+ Thread.sleep(config.flushSchedulerThreadRate)
+ assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
}
+ /**
+ * Test that new logs that are created are assigned to the least loaded log directory
+ */
@Test
def testLeastLoadedAssignment() {
// create a log manager with multiple data directories
@@ -196,6 +209,9 @@ class LogManagerTest extends JUnit3Suite
}
}
+ /**
+ * Test that it is not possible to open two log managers using the same data directory
+ */
def testTwoLogManagersUsingSameDirFails() {
try {
new LogManager(logManager.config, scheduler, time)
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Sun Dec 2 20:50:01 2012
@@ -2,6 +2,9 @@ package kafka.log
import junit.framework.Assert._
import java.util.concurrent.atomic._
+import java.io.File
+import java.io.RandomAccessFile
+import java.util.Random
import org.junit.{Test, After}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
@@ -13,6 +16,7 @@ class LogSegmentTest extends JUnit3Suite
val segments = mutable.ArrayBuffer[LogSegment]()
+ /* create a segment with the given base offset */
def createSegment(offset: Long): LogSegment = {
val msFile = TestUtils.tempFile()
val ms = new FileMessageSet(msFile)
@@ -24,6 +28,7 @@ class LogSegmentTest extends JUnit3Suite
seg
}
+ /* create a ByteBufferMessageSet for the given messages starting from the given offset */
def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
offsetCounter = new AtomicLong(offset),
@@ -34,17 +39,24 @@ class LogSegmentTest extends JUnit3Suite
def teardown() {
for(seg <- segments) {
seg.index.delete()
- seg.messageSet.delete()
+ seg.log.delete()
}
}
+ /**
+ * A read on an empty log segment should return null
+ */
@Test
def testReadOnEmptySegment() {
val seg = createSegment(40)
val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
- assertEquals(0, read.size)
+ assertNull("Read beyond the last offset in the segment should be null", read)
}
+ /**
+ * Reading from before the first offset in the segment should return messages
+ * beginning with the first message in the segment
+ */
@Test
def testReadBeforeFirstOffset() {
val seg = createSegment(40)
@@ -54,24 +66,40 @@ class LogSegmentTest extends JUnit3Suite
assertEquals(ms.toList, read.toList)
}
- @Test
- def testReadSingleMessage() {
- val seg = createSegment(40)
- val ms = messages(50, "hello", "there")
- seg.append(50, ms)
- val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50))
- assertEquals(new Message("hello".getBytes), read.head.message)
- }
-
+ /**
+ * If we set the startOffset and maxOffset for the read to be the same value
+ * we should get only the first message in the log
+ */
+ @Test
+ def testMaxOffset() {
+ val baseOffset = 50
+ val seg = createSegment(baseOffset)
+ val ms = messages(baseOffset, "hello", "there", "beautiful")
+ seg.append(baseOffset, ms)
+ def validate(offset: Long) =
+ assertEquals(ms.filter(_.offset == offset).toList,
+ seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList)
+ validate(50)
+ validate(51)
+ validate(52)
+ }
+
+ /**
+ * If we read from an offset beyond the last offset in the segment we should get null
+ */
@Test
def testReadAfterLast() {
val seg = createSegment(40)
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
- assertEquals(0, read.size)
+ assertNull("Read beyond the last offset in the segment should give null", null)
}
+ /**
+ * If we read from an offset which doesn't exist we should get a message set beginning
+ * with the least offset greater than the given startOffset.
+ */
@Test
def testReadFromGap() {
val seg = createSegment(40)
@@ -83,6 +111,10 @@ class LogSegmentTest extends JUnit3Suite
assertEquals(ms2.toList, read.toList)
}
+ /**
+ * In a loop append two messages then truncate off the second of those messages and check that we can read
+ * the first but not the second message.
+ */
@Test
def testTruncate() {
val seg = createSegment(40)
@@ -93,26 +125,33 @@ class LogSegmentTest extends JUnit3Suite
val ms2 = messages(offset+1, "hello")
seg.append(offset+1, ms2)
// check that we can read back both messages
- val read = seg.read(offset, 10000, None)
+ val read = seg.read(offset, None, 10000)
assertEquals(List(ms1.head, ms2.head), read.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
- val read2 = seg.read(offset, 10000, None)
+ val read2 = seg.read(offset, None, 10000)
assertEquals(1, read2.size)
assertEquals(ms1.head, read2.head)
offset += 1
}
}
+ /**
+ * Test truncating the whole segment, and check that we can reappend with the original offset.
+ */
@Test
def testTruncateFull() {
// test the case where we fully truncate the log
val seg = createSegment(40)
seg.append(40, messages(40, "hello", "there"))
seg.truncateTo(0)
+ assertNull("Segment should be empty.", seg.read(0, None, 1024))
seg.append(40, messages(40, "hello", "there"))
}
+ /**
+ * Test that offsets are assigned sequentially and that the nextOffset variable is incremented
+ */
@Test
def testNextOffsetCalculation() {
val seg = createSegment(40)
@@ -121,4 +160,50 @@ class LogSegmentTest extends JUnit3Suite
assertEquals(53, seg.nextOffset())
}
+ /**
+ * Create a segment with some data and an index. Then corrupt the index,
+ * and recover the segment, the entries should all be readable.
+ */
+ @Test
+ def testRecoveryFixesCorruptIndex() {
+ val seg = createSegment(0)
+ for(i <- 0 until 100)
+ seg.append(i, messages(i, i.toString))
+ val indexFile = seg.index.file
+ writeNonsense(indexFile, 5, indexFile.length.toInt)
+ seg.recover(64*1024)
+ for(i <- 0 until 100)
+ assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
+ }
+
+ /**
+ * Randomly corrupt a log a number of times and attempt recovery.
+ */
+ @Test
+ def testRecoveryWithCorruptMessage() {
+ val rand = new Random(1)
+ val messagesAppended = 20
+ for(iteration <- 0 until 10) {
+ val seg = createSegment(0)
+ for(i <- 0 until messagesAppended)
+ seg.append(i, messages(i, i.toString))
+ val offsetToBeginCorruption = rand.nextInt(messagesAppended)
+ // start corrupting somewhere in the middle of the chosen record all the way to the end
+ val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
+ writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position)
+ seg.recover(64*1024)
+ assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
+ seg.delete()
+ }
+ }
+
+ def writeNonsense(fileName: File, position: Long, size: Int) {
+ val file = new RandomAccessFile(fileName, "rw")
+ file.seek(position)
+ val rand = new Random
+ for(i <- 0 until size)
+ file.writeByte(rand.nextInt(255))
+ file.close()
+ }
+
}
\ No newline at end of file
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Sun Dec 2 20:50:01 2012
@@ -54,7 +54,10 @@ class LogTest extends JUnitSuite {
}
}
- /** Test that the size and time based log segment rollout works. */
+ /**
+ * Tests for time based log roll. This test appends messages then changes the time
+ * using the mock clock to force the log to roll and checks the number of segments.
+ */
@Test
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
@@ -70,27 +73,26 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
log.append(set)
- assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
-
- // segment expires in age
- time.currentMs += rollMs + 1
- log.append(set)
- assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+ assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
- time.currentMs += rollMs + 1
- val blank = Array[Message]()
- log.append(new ByteBufferMessageSet(new Message("blah".getBytes)))
- assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+ for(numSegments <- 2 until 4) {
+ time.currentMs += rollMs + 1
+ log.append(set)
+ assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
+ }
+ val numSegments = log.numberOfSegments
time.currentMs += rollMs + 1
- // the last segment expired in age, but was blank. So new segment should not be generated
log.append(new ByteBufferMessageSet())
- assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+ assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
}
+ /**
+ * Test that appending more than the maximum segment size rolls the log
+ */
@Test
def testSizeBasedLogRoll() {
- val set = TestUtils.singleMessageSet("test".getBytes())
+ val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@@ -106,28 +108,80 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
}
+ /**
+ * Test that we can open and append to an empty log
+ */
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ log.append(TestUtils.singleMessageSet("test".getBytes))
}
+ /**
+ * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
+ */
@Test
- def testAppendAndRead() {
- val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 10)
- log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
- log.flush()
- val messages = log.read(0, 1024)
- var current = 0
- for(curr <- messages) {
- assertEquals("Read message should equal written", message, curr.message)
- current += 1
+ def testAppendAndReadWithSequentialOffsets() {
+ val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
+
+ for(i <- 0 until messages.length)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
+ for(i <- 0 until messages.length) {
+ val read = log.read(i, 100, Some(i+1)).head
+ assertEquals("Offset read should match order appended.", i, read.offset)
+ assertEquals("Message should match appended.", messages(i), read.message)
}
- assertEquals(10, current)
+ assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
}
-
+
+ /**
+ * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
+ * from any offset less than the logEndOffset including offsets not appended.
+ */
+ @Test
+ def testAppendAndReadWithNonSequentialOffsets() {
+ val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
+ val messages = messageIds.map(id => new Message(id.toString.getBytes))
+
+ // now test the case that we give the offsets and use non-sequential offsets
+ for(i <- 0 until messages.length)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
+ for(i <- 50 until messageIds.max) {
+ val idx = messageIds.indexWhere(_ >= i)
+ val read = log.read(i, 100, None).head
+ assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
+ assertEquals("Message should match appended.", messages(idx), read.message)
+ }
+ }
+
+ /**
+ * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
+ * Specifically we create a log where the last message in the first segment has offset 0. If we
+ * then read offset 1, we should expect this read to come from the second segment, even though the
+ * first segment has the greatest lower bound on the offset.
+ */
+ @Test
+ def testReadAtLogGap() {
+ val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+
+ // keep appending until we have two segments with only a single message in the second segment
+ while(log.numberOfSegments == 1)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+
+ // now manually truncate off all but one message from the first segment to create a gap in the messages
+ log.logSegments.head.truncateTo(1)
+
+ assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
+ }
+
+ /**
+ * Test reading at the boundary of the log, specifically
+ * - reading from the logEndOffset should give an empty message set
+ * - reading beyond the log end offset should throw an OffsetOutOfRangeException
+ */
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
@@ -147,14 +201,17 @@ class LogTest extends JUnitSuite {
}
}
- /** Test that writing and reading beyond the log size boundary works */
+ /**
+ * Test that covers reads and writes on a multisegment log. This test appends a bunch of messages
+ * and then reads them all back and checks that the message read and offset matches what was appended.
+ */
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
- val offsets = messageSets.map(log.append(_)._1)
+ val offsets = messageSets.map(log.append(_).firstOffset)
log.flush
/* do successive reads to ensure all our messages are there */
@@ -169,7 +226,9 @@ class LogTest extends JUnitSuite {
assertEquals("Should be no more messages", 0, lastRead.size)
}
- /** Test the case where we have compressed batches of messages */
+ /**
+ * Test reads at offsets that fall within compressed message set boundaries.
+ */
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
@@ -187,66 +246,46 @@ class LogTest extends JUnitSuite {
assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
}
-
- @Test
- def testFindSegment() {
- assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
- assertEquals("Search in segment list just outside the range of the last segment should find last segment",
- 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
- assertEquals("Search in segment list far outside the range of the last segment should find last segment",
- 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
- assertEquals("Search in segment list far outside the range of the last segment should find last segment",
- None, Log.findRange(makeRanges(5, 9, 12), -1))
- assertContains(makeRanges(5, 9, 12), 11)
- assertContains(makeRanges(5), 4)
- assertContains(makeRanges(5,8), 5)
- assertContains(makeRanges(5,8), 6)
- }
- @Test
- def testEdgeLogRollsStartingAtZero() {
- // first test a log segment starting at 0
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val curOffset = log.logEndOffset
- assertEquals(curOffset, 0)
-
- // time goes by; the log file is deleted
- log.markDeletedWhile(_ => true)
-
- // we now have a new log; the starting offset of the new log should remain 0
- assertEquals(curOffset, log.logEndOffset)
- log.delete()
- }
-
- @Test
- def testEdgeLogRollsStartingAtNonZero() {
- // second test an empty log segment starting at non-zero
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val numMessages = 1
- for(i <- 0 until numMessages)
- log.append(TestUtils.singleMessageSet(i.toString.getBytes))
- val curOffset = log.logEndOffset
-
- // time goes by; the log file is deleted
- log.markDeletedWhile(_ => true)
-
- // we now have a new log
- assertEquals(curOffset, log.logEndOffset)
-
- // time goes by; the log file (which is empty) is deleted again
- val deletedSegments = log.markDeletedWhile(_ => true)
-
- // we shouldn't delete the last empty log segment.
- assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
-
- // we now have a new log
- assertEquals(curOffset, log.logEndOffset)
+ /**
+ * Test garbage collecting old segments
+ */
+ @Test
+ def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
+ for(messagesToAppend <- List(0, 1, 25)) {
+ logDir.mkdirs()
+ // first test a log segment starting at 0
+ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ for(i <- 0 until messagesToAppend)
+ log.append(TestUtils.singleMessageSet(i.toString.getBytes))
+
+ var currOffset = log.logEndOffset
+ assertEquals(currOffset, messagesToAppend)
+
+ // time goes by; the log file is deleted
+ log.deleteOldSegments(_ => true)
+
+ assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
+ assertEquals("We should still have one segment left", 1, log.numberOfSegments)
+ assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
+ assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
+ assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
+ currOffset,
+ log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
+
+ // cleanup the log
+ log.delete()
+ }
}
+ /**
+ * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
+ * setting and checking that an exception is thrown.
+ */
@Test
def testMessageSizeCheck() {
- val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
- val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+ val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+ val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
@@ -259,10 +298,13 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
- case e:MessageSizeTooLargeException => // this is good
+ case e: MessageSizeTooLargeException => // this is good
}
}
+ /**
+ * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
+ */
@Test
def testLogRecoversToCorrectOffset() {
val numMessages = 100
@@ -273,25 +315,53 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
- val lastIndexOffset = log.segments.view.last.index.lastOffset
- val numIndexEntries = log.segments.view.last.index.entries
+ val lastIndexOffset = log.activeSegment.index.lastOffset
+ val numIndexEntries = log.activeSegment.index.entries
log.close()
// test non-recovery case
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
- assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
- assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+ assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+ assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
- // test
+ // test recovery case
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
- assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
- assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+ assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+ assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+ log.close()
+ }
+
+ /**
+ * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
+ */
+ @Test
+ def testIndexRebuild() {
+ // publish the messages and close the log
+ val numMessages = 200
+ var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+ for(i <- 0 until numMessages)
+ log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+ val indexFiles = log.logSegments.map(_.index.file)
+ log.close()
+
+ // delete all the index files
+ indexFiles.foreach(_.delete())
+
+ // reopen the log
+ log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+
+ assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
+ for(i <- 0 until numMessages)
+ assertEquals(i, log.read(i, 100, None).head.offset)
log.close()
}
+ /**
+ * Test the Log truncate operations
+ */
@Test
def testTruncateTo() {
val set = TestUtils.singleMessageSet("test".getBytes())
@@ -329,7 +399,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
- log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
+ log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
assertEquals("Should change log size", log.size, 0)
@@ -343,6 +413,9 @@ class LogTest extends JUnitSuite {
assertEquals("Should change log size", log.size, 0)
}
+ /**
+ * Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends
+ */
@Test
def testIndexResizingAtTruncation() {
val set = TestUtils.singleMessageSet("test".getBytes())
@@ -357,48 +430,25 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
- assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries)
+ assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries)
log.truncateTo(0)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
- assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.segments.view(0).index.maxEntries)
+ assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
}
-
- @Test
- def testAppendWithoutOffsetAssignment() {
- for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
- logDir.mkdir()
- var log = new Log(logDir,
- maxLogFileSize = 64*1024,
- maxMessageSize = config.maxMessageSize,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = true)
- val messages = List("one", "two", "three", "four", "five", "six")
- val ms = new ByteBufferMessageSet(compressionCodec = codec,
- offsetCounter = new AtomicLong(5),
- messages = messages.map(s => new Message(s.getBytes)):_*)
- val firstOffset = ms.shallowIterator.toList.head.offset
- val lastOffset = ms.shallowIterator.toList.last.offset
- val (first, last) = log.append(ms, assignOffsets = false)
- assertEquals(last + 1, log.logEndOffset)
- assertEquals(firstOffset, first)
- assertEquals(lastOffset, last)
- assertTrue(log.read(5, 64*1024).size > 0)
- log.delete()
- }
- }
-
+ /**
+ * Verify that truncation works correctly after re-opening the log
+ */
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
// create a log
var log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
@@ -409,7 +459,7 @@ class LogTest extends JUnitSuite {
log.append(set)
log.close()
log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
@@ -419,24 +469,4 @@ class LogTest extends JUnitSuite {
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
- def assertContains(ranges: Array[Range], offset: Long) = {
- Log.findRange(ranges, offset) match {
- case Some(range) =>
- assertTrue(range + " does not contain " + offset, range.contains(offset))
- case None => fail("No range found, but expected to find " + offset)
- }
- }
-
- class SimpleRange(val start: Long, val size: Long) extends Range
-
- def makeRanges(breaks: Int*): Array[Range] = {
- val list = new ArrayList[Range]
- var prior = 0
- for(brk <- breaks) {
- list.add(new SimpleRange(prior, brk - prior))
- prior = brk
- }
- list.toArray(new Array[Range](list.size))
- }
-
}
Modified: kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Sun Dec 2 20:50:01 2012
@@ -26,7 +26,7 @@ import org.junit.Test
trait BaseMessageSetTestCases extends JUnitSuite {
- val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+ val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes))
def createMessageSet(messages: Seq[Message]): MessageSet
Modified: kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Sun Dec 2 20:50:01 2012
@@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Su
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
+ props.put("producer.num.retries", 3.toString)
val config = new ProducerConfig(props)
Modified: kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Sun Dec 2 20:50:01 2012
@@ -40,7 +40,7 @@ class SimpleFetchTest extends JUnit3Suit
val partitionId = 0
/**
- * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has
+ * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has
* one partition with one follower replica on broker "1". The leader replica on "0"
* has HW of "5" and LEO of "20". The follower on broker "1" has a local replica
* with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
@@ -62,6 +62,7 @@ class SimpleFetchTest extends JUnit3Suit
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
+ EasyMock.expect(log)
EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
EasyMock.replay(log)
@@ -102,33 +103,6 @@ class SimpleFetchTest extends JUnit3Suit
// make sure the log only reads bytes between 0->HW (5)
EasyMock.verify(log)
-
- // Test offset request from non-replica
- val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
- val offsetRequest = OffsetRequest(
- Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
- val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
-
- EasyMock.reset(logManager)
- EasyMock.reset(replicaManager)
-
- EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
- EasyMock.expect(replicaManager.logManager).andReturn(logManager)
- EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
-
- EasyMock.replay(replicaManager)
- EasyMock.replay(logManager)
-
- apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
- requestKey = 5,
- buffer = offsetRequestBB,
- startTimeMs = 1))
- val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
- val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
- EasyMock.verify(replicaManager)
- EasyMock.verify(logManager)
- assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
- assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
/**
@@ -203,34 +177,6 @@ class SimpleFetchTest extends JUnit3Suit
* an offset of 15
*/
EasyMock.verify(log)
-
- // Test offset request from replica
- val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
- val offsetRequest = OffsetRequest(
- Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)),
- replicaId = followerReplicaId)
- val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
-
- EasyMock.reset(logManager)
- EasyMock.reset(replicaManager)
-
- EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
- EasyMock.expect(replicaManager.logManager).andReturn(logManager)
- EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
-
- EasyMock.replay(replicaManager)
- EasyMock.replay(logManager)
-
- apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
- requestKey = 5,
- buffer = offsetRequestBB,
- startTimeMs = 1))
- val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
- val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
- EasyMock.verify(replicaManager)
- EasyMock.verify(logManager)
- assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
- assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sun Dec 2 20:50:01 2012
@@ -127,7 +127,6 @@ object TestUtils extends Logging {
props.put("hostname", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
- props.put("log.flush.interval", "1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props
Modified: kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Sun Dec 2 20:50:01 2012
@@ -33,7 +33,8 @@ class EmbeddedZookeeper(val connectStrin
factory.startup(zookeeper)
def shutdown() {
- factory.shutdown()
+ Utils.swallow(zookeeper.shutdown())
+ Utils.swallow(factory.shutdown())
Utils.rm(logDir)
Utils.rm(snapshotDir)
}
Modified: kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Sun Dec 2 20:50:01 2012
@@ -19,7 +19,7 @@ package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, TestZKUtils}
+import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -36,8 +36,8 @@ trait ZooKeeperTestHarness extends JUnit
override def tearDown() {
super.tearDown
- zkClient.close()
- zookeeper.shutdown()
+ Utils.swallow(zkClient.close())
+ Utils.swallow(zookeeper.shutdown())
}
}
Modified: kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java?rev=1416253&r1=1416252&r2=1416253&view=diff
==============================================================================
--- kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java Sun Dec 2 20:50:01 2012
@@ -25,7 +25,6 @@ import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.Message;
public class Consumer extends Thread