You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/08/19 02:51:20 UTC

svn commit: r1159461 - in /incubator/kafka/trunk/core/src/test/scala/unit/kafka: message/ByteBufferMessageSetTest.scala utils/TestUtils.scala

Author: nehanarkhede
Date: Fri Aug 19 00:51:20 2011
New Revision: 1159461

URL: http://svn.apache.org/viewvc?rev=1159461&view=rev
Log:
Some new unit tests for ByteBufferMessageSet iterator KAFKA-108; patched by Jun; reviewed by Neha

Modified:
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1159461&r1=1159460&r2=1159461&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Fri Aug 19 00:51:20 2011
@@ -20,6 +20,7 @@ package kafka.message
 import java.nio._
 import junit.framework.Assert._
 import org.junit.Test
+import kafka.utils.TestUtils
 
 class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
@@ -49,4 +50,68 @@ class ByteBufferMessageSetTest extends B
     assertTrue(messages.equals(moreMessages))
   }
   
+
+  @Test
+  def testIterator() {
+    val messageList = List(
+        new Message("msg1".getBytes),
+        new Message("msg2".getBytes),
+        new Message("msg3".getBytes)
+      )
+
+    // test for uncompressed regular messages
+    {
+      val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+      //make sure ByteBufferMessageSet is re-iterable.
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+      //make sure the last offset after iteration is correct
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+    }
+
+    // test for compressed regular messages
+    {
+      val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+      //make sure ByteBufferMessageSet is re-iterable.
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
+      //make sure the last offset after iteration is correct
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+    }
+
+    // test for mixed empty and non-empty messagesets uncompressed
+    {
+      val emptyMessageList : List[Message] = Nil
+      val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
+      val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
+      buffer.put(emptyMessageSet.serialized)
+      buffer.put(regularMessgeSet.serialized)
+      buffer.rewind
+      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+      //make sure ByteBufferMessageSet is re-iterable.
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+      //make sure the last offset after iteration is correct
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+    }
+
+    // test for mixed empty and non-empty messagesets compressed
+    {
+      val emptyMessageList : List[Message] = Nil
+      val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
+      val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
+      buffer.put(emptyMessageSet.serialized)
+      buffer.put(regularMessgeSet.serialized)
+      buffer.rewind
+      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+      //make sure ByteBufferMessageSet is re-iterable.
+      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
+      //make sure the last offset after iteration is correct
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+    }
+  }
+
 }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1159461&r1=1159460&r2=1159461&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Aug 19 00:51:20 2011
@@ -288,6 +288,18 @@ object TestUtils {
     ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
 
   }
+
+  def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = {
+    new IteratorTemplate[Message] {
+      override def makeNext(): Message = {
+        if (iter.hasNext)
+          return iter.next.message
+        else
+          return allDone()
+      }
+    }
+  }
+
 }
 
 object TestZKUtils {