You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/19 02:51:20 UTC
svn commit: r1159461 - in
/incubator/kafka/trunk/core/src/test/scala/unit/kafka:
message/ByteBufferMessageSetTest.scala utils/TestUtils.scala
Author: nehanarkhede
Date: Fri Aug 19 00:51:20 2011
New Revision: 1159461
URL: http://svn.apache.org/viewvc?rev=1159461&view=rev
Log:
Some new unit tests for ByteBufferMessageSet iterator KAFKA-108; patched by Jun; reviewed by Neha
Modified:
incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1159461&r1=1159460&r2=1159461&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Fri Aug 19 00:51:20 2011
@@ -20,6 +20,7 @@ package kafka.message
import java.nio._
import junit.framework.Assert._
import org.junit.Test
+import kafka.utils.TestUtils
class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
@@ -49,4 +50,68 @@ class ByteBufferMessageSetTest extends B
assertTrue(messages.equals(moreMessages))
}
+
+ @Test
+ def testIterator() {
+ val messageList = List(
+ new Message("msg1".getBytes),
+ new Message("msg2".getBytes),
+ new Message("msg3".getBytes)
+ )
+
+ // test for uncompressed regular messages
+ {
+ val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+ //make sure ByteBufferMessageSet is re-iterable.
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+ //make sure the last offset after iteration is correct
+ assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+ }
+
+ // test for compressed regular messages
+ {
+ val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+ //make sure ByteBufferMessageSet is re-iterable.
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+ //make sure the last offset after iteration is correct
+ assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+ }
+
+ // test for mixed empty and non-empty messagesets uncompressed
+ {
+ val emptyMessageList : List[Message] = Nil
+ val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
+ val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+ val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
+ buffer.put(emptyMessageSet.serialized)
+ buffer.put(regularMessgeSet.serialized)
+ buffer.rewind
+ val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+ //make sure ByteBufferMessageSet is re-iterable.
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+ //make sure the last offset after iteration is correct
+ assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+ }
+
+ // test for mixed empty and non-empty messagesets compressed
+ {
+ val emptyMessageList : List[Message] = Nil
+ val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
+ val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
+ val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
+ buffer.put(emptyMessageSet.serialized)
+ buffer.put(regularMessgeSet.serialized)
+ buffer.rewind
+ val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+ //make sure ByteBufferMessageSet is re-iterable.
+ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+ //make sure the last offset after iteration is correct
+ assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+ }
+ }
+
}
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1159461&r1=1159460&r2=1159461&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Aug 19 00:51:20 2011
@@ -288,6 +288,18 @@ object TestUtils {
ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
}
+
+ def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = {
+ new IteratorTemplate[Message] {
+ override def makeNext(): Message = {
+ if (iter.hasNext)
+ return iter.next.message
+ else
+ return allDone()
+ }
+ }
+ }
+
}
object TestZKUtils {