You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/13 04:58:39 UTC

svn commit: r1243407 [2/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaapi/cons...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Mon Feb 13 03:58:37 2012
@@ -18,15 +18,15 @@
 package kafka.javaapi.integration
 
 import scala.collection._
-import kafka.api.FetchRequest
+import kafka.api.FetchRequestBuilder
 import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
-import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.javaapi.ProducerRequest
+import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import kafka.utils.TestUtils
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -43,39 +43,42 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topic = "test"
 
-//    send an empty messageset first
-    val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                         messages = getMessageList(Seq.empty[Message]: _*))
+    // send an empty messageset first
+    val sent2 = new ByteBufferMessageSet(NoCompressionCodec, getMessageList(Seq.empty[Message]: _*))
     producer.send(topic, sent2)
+
     Thread.sleep(200)
     sent2.getBuffer.rewind
-    var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+    val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+    val fetchedMessage2 = fetched2.messageSet(topic, 0)
+    TestUtils.checkEquals(sent2.iterator, fetchedMessage2.iterator)
 
 
     // send some messages
-    val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                         messages = getMessageList(new Message("hello".getBytes()),
-      new Message("there".getBytes())))
+    val sent3 = new ByteBufferMessageSet(NoCompressionCodec,
+                                         getMessageList(
+                                           new Message("hello".getBytes()),new Message("there".getBytes())))
     producer.send(topic, sent3)
 
     Thread.sleep(200)
     sent3.getBuffer.rewind
-    var fetched3: ByteBufferMessageSet = null
-    while(fetched3 == null || fetched3.validBytes == 0)
-      fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+    var messageSet: ByteBufferMessageSet = null
+    while(messageSet == null || messageSet.validBytes == 0) {
+      val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      messageSet = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
+    }
+    TestUtils.checkEquals(sent3.iterator, messageSet.iterator)
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send an invalid offset
     try {
-      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
-      fetchedWithError.iterator
-      fail("expect exception")
-    }
-    catch {
+      val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
+      val messageWithError = fetchedWithError.messageSet(topic, 0)
+      messageWithError.iterator
+      fail("Fetch with invalid offset should throw an exception when iterating over response")
+    } catch {
       case e: OffsetOutOfRangeException => "this is good"
     }
 
@@ -87,39 +90,42 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topic = "test"
 
-//    send an empty messageset first
-    val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                         messages = getMessageList(Seq.empty[Message]: _*))
+    // send an empty messageset first
+    val sent2 = new ByteBufferMessageSet(DefaultCompressionCodec, getMessageList(Seq.empty[Message]: _*))
     producer.send(topic, sent2)
+
     Thread.sleep(200)
     sent2.getBuffer.rewind
-    var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+    val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+    val message2 = fetched2.messageSet(topic, 0)
+    TestUtils.checkEquals(sent2.iterator, message2.iterator)
 
 
     // send some messages
-    val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                         messages = getMessageList(new Message("hello".getBytes()),
-      new Message("there".getBytes())))
+    val sent3 = new ByteBufferMessageSet( DefaultCompressionCodec,
+                                          getMessageList(
+                                            new Message("hello".getBytes()),new Message("there".getBytes())))
     producer.send(topic, sent3)
 
     Thread.sleep(200)
     sent3.getBuffer.rewind
-    var fetched3: ByteBufferMessageSet = null
-    while(fetched3 == null || fetched3.validBytes == 0)
-      fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-    TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+    var fetchedMessage: ByteBufferMessageSet = null
+    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+      val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+      fetchedMessage = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
+    }
+    TestUtils.checkEquals(sent3.iterator, fetchedMessage.iterator)
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send an invalid offset
     try {
-      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
-      fetchedWithError.iterator
-      fail("expect exception")
-    }
-    catch {
+      val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
+      val messageWithError = fetchedWithError.messageSet(topic, 0)
+      messageWithError.iterator
+      fail("Fetch with invalid offset should throw an exception when iterating over response")
+    } catch {
       case e: OffsetOutOfRangeException => "this is good"
     }
 
@@ -129,31 +135,27 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testProduceAndMultiFetch() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                            messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                      new Message(("b_" + topic).getBytes)))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches += new FetchRequest(topic, 0, 0, 10000)
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(getFetchRequestList(fetches: _*))
-      val iter = response.iterator
-      for(topic <- topics) {
-        if (iter.hasNext) {
-          val resp = iter.next
-          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-        }
-        else
-          fail("fewer responses than expected")
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val messageSet = response.messageSet(topic, partition)
+        TestUtils.checkEquals(messages(topic).iterator, messageSet.iterator)
       }
     }
 
@@ -162,37 +164,41 @@ class PrimitiveApiTest extends JUnit3Sui
 
     {
       // send some invalid offsets
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, 0, -1, 10000)
-
-      try {
-        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
-        val iter = responses.iterator
-        while (iter.hasNext)
-          iter.next.iterator
-        fail("expect exception")
-      }
-      catch {
-        case e: OffsetOutOfRangeException => "this is good"
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics)
+         builder.addFetch(topic, partition, -1, 10000)
+
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        try {
+            val iter = response.messageSet(topic, partition).iterator
+            while (iter.hasNext)
+              iter.next
+            fail("MessageSet for invalid offset should throw exception")
+        } catch {
+          case e: OffsetOutOfRangeException => "this is good"
+        }
       }
     }    
 
     {
       // send some invalid partitions
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, -1, 0, 10000)
-
-      try {
-        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
-        val iter = responses.iterator
-        while (iter.hasNext)
-          iter.next.iterator
-        fail("expect exception")
-      }
-      catch {
-        case e: InvalidPartitionException => "this is good"
+      val builder = new FetchRequestBuilder()
+      for( (topic, _) <- topics)
+        builder.addFetch(topic, -1, 0, 10000)
+
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, _) <- topics) {
+        try {
+          val iter = response.messageSet(topic, -1).iterator
+          while (iter.hasNext)
+            iter.next
+          fail("MessageSet for invalid partition should throw exception")
+        } catch {
+          case e: InvalidPartitionException => "this is good"
+        }
       }
     }
 
@@ -202,31 +208,31 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testProduceAndMultiFetchWithCompression() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
                                            messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                      new Message(("b_" + topic).getBytes)))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches += new FetchRequest(topic, 0, 0, 10000)
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(getFetchRequestList(fetches: _*))
-      val iter = response.iterator
-      for(topic <- topics) {
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val iter = response.messageSet(topic, partition).iterator
         if (iter.hasNext) {
-          val resp = iter.next
-          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-        }
-        else
+          TestUtils.checkEquals(messages(topic).iterator, iter)
+        } else {
           fail("fewer responses than expected")
+        }
       }
     }
 
@@ -235,37 +241,41 @@ class PrimitiveApiTest extends JUnit3Sui
 
     {
       // send some invalid offsets
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, 0, -1, 10000)
-
-      try {
-        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
-        val iter = responses.iterator
-        while (iter.hasNext)
-          iter.next.iterator
-        fail("expect exception")
-      }
-      catch {
-        case e: OffsetOutOfRangeException => "this is good"
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics)
+        builder.addFetch(topic, partition, -1, 10000)
+
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        try {
+          val iter = response.messageSet(topic, partition).iterator
+          while (iter.hasNext)
+            iter.next
+          fail("Expected exception when fetching invalid offset")
+        } catch {
+          case e: OffsetOutOfRangeException => "this is good"
+        }
       }
     }
 
     {
       // send some invalid partitions
-      val fetches = new mutable.ArrayBuffer[FetchRequest]
-      for(topic <- topics)
-        fetches += new FetchRequest(topic, -1, 0, 10000)
-
-      try {
-        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
-        val iter = responses.iterator
-        while (iter.hasNext)
-          iter.next.iterator
-        fail("expect exception")
-      }
-      catch {
-        case e: InvalidPartitionException => "this is good"
+      val builder = new FetchRequestBuilder()
+      for( (topic, _) <- topics)
+        builder.addFetch(topic, -1, 0, 10000)
+
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, _) <- topics) {
+        try {
+          val iter = response.messageSet(topic, -1).iterator
+          while (iter.hasNext)
+            iter.next
+          fail("Expected exception when fetching invalid partition")
+        } catch {
+          case e: InvalidPartitionException => "this is good"
+        }
       }
     }
 
@@ -275,79 +285,75 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testProduceAndMultiFetchJava() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                            messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                      new Message(("b_" + topic).getBytes)))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches.add(new FetchRequest(topic, 0, 0, 10000))
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(fetches)
-      val iter = response.iterator
-      for(topic <- topics) {
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val iter = response.messageSet(topic, partition).iterator
         if (iter.hasNext) {
-          val resp = iter.next
-          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-        }
-        else
+          TestUtils.checkEquals(messages(topic).iterator, iter)
+        } else {
           fail("fewer responses than expected")
+        }
       }
     }
   }
 
   def testProduceAndMultiFetchJavaWithCompression() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
       val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-      val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
-      for(topic <- topics) {
+      val builder = new FetchRequestBuilder()
+      for( (topic, partition) <- topics) {
         val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
                                            messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                      new Message(("b_" + topic).getBytes)))
         messages += topic -> set
         producer.send(topic, set)
         set.getBuffer.rewind
-        fetches.add(new FetchRequest(topic, 0, 0, 10000))
+        builder.addFetch(topic, partition, 0, 10000)
       }
 
       // wait a bit for produced message to be available
       Thread.sleep(200)
-      val response = consumer.multifetch(fetches)
-      val iter = response.iterator
-      for(topic <- topics) {
-        if (iter.hasNext) {
-          val resp = iter.next
-          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-        }
-        else
-          fail("fewer responses than expected")
+      val request = builder.build()
+      val response = consumer.fetch(request)
+      for( (topic, partition) <- topics) {
+        val iter = response.messageSet(topic, partition).iterator
+        TestUtils.checkEquals(messages(topic).iterator, iter)
       }
     }
   }
 
   def testMultiProduce() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
-    for(topic <- topics) {
+    for( (topic, partition) <- topics) {
       val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                          messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                    new Message(("b_" + topic).getBytes)))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, partition, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -356,31 +362,31 @@ class PrimitiveApiTest extends JUnit3Sui
       
     // wait a bit for produced message to be available
     Thread.sleep(200)
-    val response = consumer.multifetch(getFetchRequestList(fetches: _*))
-    val iter = response.iterator
-    for(topic <- topics) {
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for( (topic, partition) <- topics) {
+      val iter = response.messageSet(topic, partition).iterator
       if (iter.hasNext) {
-        val resp = iter.next
-        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-      }
-      else
+        TestUtils.checkEquals(messages(topic).iterator, iter)
+      } else {
         fail("fewer responses than expected")
+      }
     }
   }
 
   def testMultiProduceWithCompression() {
     // send some messages
-    val topics = List("test1", "test2", "test3");
+    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, ByteBufferMessageSet]
-    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    val builder = new FetchRequestBuilder()
     var produceList: List[ProducerRequest] = Nil
-    for(topic <- topics) {
+    for( (topic, partition) <- topics) {
       val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
                                          messages = getMessageList(new Message(("a_" + topic).getBytes),
                                                                    new Message(("b_" + topic).getBytes)))
       messages += topic -> set
       produceList ::= new ProducerRequest(topic, 0, set)
-      fetches += new FetchRequest(topic, 0, 0, 10000)
+      builder.addFetch(topic, partition, 0, 10000)
     }
     producer.multiSend(produceList.toArray)
 
@@ -389,15 +395,15 @@ class PrimitiveApiTest extends JUnit3Sui
 
     // wait a bit for produced message to be available
     Thread.sleep(200)
-    val response = consumer.multifetch(getFetchRequestList(fetches: _*))
-    val iter = response.iterator
-    for(topic <- topics) {
+    val request = builder.build()
+    val response = consumer.fetch(request)
+    for( (topic, partition) <- topics) {
+      val iter = response.messageSet(topic, partition).iterator
       if (iter.hasNext) {
-        val resp = iter.next
-        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
-      }
-      else
+        TestUtils.checkEquals(messages(topic).iterator, iter)
+      } else {
         fail("fewer responses than expected")
+      }
     }
   }
 
@@ -406,10 +412,4 @@ class PrimitiveApiTest extends JUnit3Sui
     messages.foreach(m => messageList.add(m))
     messageList
   }
-
-  private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = {
-    val fetchReqs = new java.util.ArrayList[FetchRequest]()
-    fetches.foreach(f => fetchReqs.add(f))
-    fetchReqs
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Mon Feb 13 03:58:37 2012
@@ -22,7 +22,6 @@ import kafka.utils._
 import kafka.server.{KafkaConfig, KafkaServer}
 import junit.framework.Assert._
 import java.util.{Random, Properties}
-import kafka.api.{FetchRequest, OffsetRequest}
 import collection.mutable.WrappedArray
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
@@ -30,6 +29,7 @@ import kafka.message.{NoCompressionCodec
 import org.apache.log4j._
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
 
 object LogOffsetTest {
   val random = new Random()  
@@ -66,9 +66,8 @@ class LogOffsetTest extends JUnit3Suite 
 
   @Test
   def testEmptyLogs() {
-    val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
-      new FetchRequest("test", 0, 0, 300 * 1024))
-    assertFalse(messageSet.iterator.hasNext)
+    val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
+    assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
 
     val name = "test"
     val logFile = new File(logDir, name + "-0")
@@ -119,9 +118,9 @@ class LogOffsetTest extends JUnit3Suite 
     assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
 
     // try to fetch using latest offset
-    val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
-      new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
-    assertFalse(messageSet.iterator.hasNext)
+    val fetchResponse = simpleConsumer.fetch(
+      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
+    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
   }
 
   @Test

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Mon Feb 13 03:58:37 2012
@@ -17,23 +17,21 @@
 
 package kafka.log4j
 
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{PropertyConfigurator, Logger}
 import java.util.Properties
 import java.io.File
-import kafka.consumer.SimpleConsumer
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestZKUtils
-import kafka.zk.EmbeddedZookeeper
 import junit.framework.Assert._
-import kafka.api.FetchRequest
-import kafka.serializer.Encoder
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.producer.async.MissingConfigException
+import kafka.serializer.Encoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
+import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, Utils, Logging}
 
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
@@ -172,10 +170,10 @@ class KafkaLog4jAppenderTest extends JUn
     Thread.sleep(2500)
 
     var offset = 0L
-    val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
-
+    val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build())
+    val fetchedMessage = response.messageSet("test-topic", 0)
     var count = 0
-    for(message <- messages) {
+    for(message <- fetchedMessage) {
       count = count + 1
       offset += message.offset
     }
@@ -192,14 +190,16 @@ class KafkaLog4jAppenderTest extends JUn
 
     Thread.sleep(500)
 
-    val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+    val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
+    val fetchMessage = response.messageSet("test-topic", 0)
 
     var count = 0
-    for(message <- messages) {
+    for(message <- fetchMessage) {
       count = count + 1
     }
 
-    val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+    val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
+    val messagesFromOtherBroker = response2.messageSet("test-topic", 0)
 
     for(message <- messagesFromOtherBroker) {
       count = count + 1

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Feb 13 03:58:37 2012
@@ -17,18 +17,18 @@
 
 package kafka.producer
 
-import org.apache.log4j.{Logger, Level}
-import kafka.zk.EmbeddedZookeeper
-import org.junit.{After, Before, Test}
 import junit.framework.Assert._
-import org.scalatest.junit.JUnitSuite
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.api.FetchRequest
+import java.util.Properties
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.serializer.Encoder
-import kafka.consumer.SimpleConsumer
-import java.util.Properties
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
+import kafka.utils.{TestUtils, TestZKUtils, Utils}
+import kafka.zk.EmbeddedZookeeper
+import org.apache.log4j.{Logger, Level}
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
 
 class ProducerTest extends JUnitSuite {
   private val topic = "test-topic"
@@ -106,12 +106,14 @@ class ProducerTest extends JUnitSuite {
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       Thread.sleep(100)
       // cross check if brokers got the messages
-      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-      val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet("new-topic", 0)
+      assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.head.message)
+      val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet2 = response2.messageSet("new-topic", 0)
+      assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet2.head.message)
     } catch {
       case e: Exception => fail("Not expected", e)
     }
@@ -142,11 +144,12 @@ class ProducerTest extends JUnitSuite {
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       Thread.sleep(100)
       // cross check if brokers got the messages
-      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-      assertTrue("Message set should have another message", messageSet1.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet1Iter = response1.messageSet("new-topic", 0).iterator
+      assertTrue("Message set should have 1 message", messageSet1Iter.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
+      assertTrue("Message set should have another message", messageSet1Iter.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
     } catch {
       case e: Exception => fail("Not expected")
     }
@@ -174,9 +177,10 @@ class ProducerTest extends JUnitSuite {
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
       Thread.sleep(100)
       // cross check if brokers got the messages
-      val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
+      val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet1 = response1.messageSet("new-topic", 0)
+      assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet1.head.message)
 
       // shutdown server2
       server2.shutdown
@@ -197,9 +201,10 @@ class ProducerTest extends JUnitSuite {
       Thread.sleep(100)
 
       // cross check if brokers got the messages
-      val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.next.message)
+      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet2 = response2.messageSet("new-topic", 0)
+      assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet2.head.message)
 
     } catch {
       case e: Exception => fail("Not expected", e)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Mon Feb 13 03:58:37 2012
@@ -17,7 +17,6 @@
 package kafka.server
 
 import java.io.File
-import kafka.api.FetchRequest
 import kafka.producer.{SyncProducer, SyncProducerConfig}
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
@@ -27,6 +26,7 @@ import kafka.message.{NoCompressionCodec
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestUtils, Utils}
+import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest}
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
@@ -82,11 +82,13 @@ class ServerShutdownTest extends JUnit3S
       server.startup()
 
       // bring the server back again and read the messages
-      var fetched: ByteBufferMessageSet = null
-      while(fetched == null || fetched.validBytes == 0)
-        fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
-      TestUtils.checkEquals(sent1.iterator, fetched.iterator)
-      val newOffset = fetched.validBytes
+      var fetchedMessage: ByteBufferMessageSet = null
+      while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+        val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+        fetchedMessage = fetched.messageSet(topic, 0)
+      }
+      TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator)
+      val newOffset = fetchedMessage.validBytes
 
       // send some more messages
       producer.send(topic, sent2)
@@ -94,10 +96,12 @@ class ServerShutdownTest extends JUnit3S
 
       Thread.sleep(200)
 
-      fetched = null
-      while(fetched == null || fetched.validBytes == 0)
-        fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
-      TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)
+      fetchedMessage = null
+      while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+        val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
+        fetchedMessage = fetched.messageSet(topic, 0)
+      }
+      TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m => m.message).iterator)
 
       server.shutdown()
       Utils.rm(server.config.logDir)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Feb 13 03:58:37 2012
@@ -186,23 +186,21 @@ object TestUtils {
       length += 1
       assertEquals(expected.next, actual.next)
     }
-    
-    if (expected.hasNext)
-    {
+
+    // check if the expected iterator is longer
+    if (expected.hasNext) {
      var length1 = length;
-     while (expected.hasNext)
-     {
+     while (expected.hasNext) {
        expected.next
        length1 += 1
      }
      assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
     }
-    
-    if (actual.hasNext)
-    {
+
+    // check if the actual iterator was longer
+    if (actual.hasNext) {
      var length2 = length;
-     while (actual.hasNext)
-     {
+     while (actual.hasNext) {
        actual.next
        length2 += 1
      }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Mon Feb 13 03:58:37 2012
@@ -77,7 +77,7 @@ class ZKLoadBalanceTest extends JUnit3Su
 
 
       // wait a bit to make sure rebalancing logic is triggered
-      Thread.sleep(1000)
+      Thread.sleep(1500)
       // check Partition Owner Registry
       val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
       val expected_3 = List( ("200-0", "group1_consumer1-0"),

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java Mon Feb 13 03:58:37 2012
@@ -28,4 +28,5 @@ public interface KafkaProperties
   final static int reconnectInterval = 10000;
   final static String topic2 = "topic2";
   final static String topic3 = "topic3";
+  final static String clientId = "SimpleConsumerDemoClient";
 }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Mon Feb 13 03:58:37 2012
@@ -16,71 +16,76 @@
  */
 package kafka.examples;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.javaapi.MultiFetchResponse;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.message.MessageSet;
 import kafka.message.MessageAndOffset;
-import scala.collection.Iterator;
 
-import kafka.api.FetchRequest;
-import kafka.message.Message;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 
-public class SimpleConsumerDemo
-{
-  private static void printMessages(ByteBufferMessageSet messageSet)
-  {
+public class SimpleConsumerDemo {
+    
+  private static void printMessages(ByteBufferMessageSet messageSet) {
     for (MessageAndOffset messageAndOffset : messageSet) {
       System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
     }
   }
 
-  private static void generateData()
-  {
+  private static void generateData() {
     Producer producer2 = new Producer(KafkaProperties.topic2);
     producer2.start();
     Producer producer3 = new Producer(KafkaProperties.topic3);
     producer3.start();
-    try
-    {
+    try {
       Thread.sleep(1000);
-    }
-    catch (InterruptedException e)
-    {
+    } catch (InterruptedException e) {
       e.printStackTrace();
     }
   }
   
-  public static void main(String[] args)
-  {
-    
+  public static void main(String[] args) {
     generateData();
+      
     SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
                                                        KafkaProperties.kafkaServerPort,
                                                        KafkaProperties.connectionTimeOut,
                                                        KafkaProperties.kafkaProducerBufferSize);
 
     System.out.println("Testing single fetch");
-    FetchRequest req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
-    ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);
-    printMessages(messageSet);
+    FetchRequest req = new FetchRequestBuilder()
+            .correlationId(0)
+            .clientId(KafkaProperties.clientId)
+            .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+            .build();
+    FetchResponse fetchResponse = simpleConsumer.fetch(req);
+      printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
 
     System.out.println("Testing single multi-fetch");
-    req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
-    List<FetchRequest> list = new ArrayList<FetchRequest>();
-    list.add(req);
-    req = new FetchRequest(KafkaProperties.topic3, 0, 0L, 100);
-    list.add(req);
-    MultiFetchResponse response = simpleConsumer.multifetch(list);
+    Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{
+        put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }});
+        put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
+    }};
+    req = new FetchRequestBuilder()
+            .correlationId(0)
+            .clientId(KafkaProperties.clientId)
+            .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+            .addFetch(KafkaProperties.topic3, 0, 0L, 100)
+            .build();
+    fetchResponse = simpleConsumer.fetch(req);
     int fetchReq = 0;
-    for (ByteBufferMessageSet resMessageSet : response )
-    {
-      System.out.println("Response from fetch request no: " + ++fetchReq);
-      printMessages(resMessageSet);
+    for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
+      String topic = entry.getKey();
+      for ( Integer offset : entry.getValue()) {
+        System.out.println("Response from fetch request no: " + ++fetchReq);
+        printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
+      }
     }
   }
-
 }

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Mon Feb 13 03:58:37 2012
@@ -18,13 +18,12 @@
 package kafka.perf
 
 import java.net.URI
-import joptsimple._
-import kafka.utils._
-import kafka.server._
+import java.text.SimpleDateFormat
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
 import kafka.consumer.SimpleConsumer
+import kafka.utils._
 import org.apache.log4j.Logger
-import kafka.api.{OffsetRequest, FetchRequest}
-import java.text.SimpleDateFormat
+import kafka.message.ByteBufferMessageSet
 
 /**
  * Performance test for the simple consumer
@@ -56,12 +55,20 @@ object SimpleConsumerPerformance {
     var lastReportTime: Long = startMs
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
+    var reqId = 0
     while(!done) {
-      val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize))
+      // TODO: add in the maxWait and minBytes for performance
+      val request = new FetchRequestBuilder()
+        .correlationId(reqId)
+        .clientId(config.clientId)
+        .addFetch(config.topic, config.partition, offset, config.fetchSize)
+        .build()
+      val fetchResponse = consumer.fetch(request)
+
       var messagesRead = 0
       var bytesRead = 0
-
-      for(message <- messages) {
+      val messageSet = fetchResponse.messageSet(config.topic, config.partition)
+      for (message <- messageSet) {
         messagesRead += 1
         bytesRead += message.message.payloadSize
       }
@@ -69,7 +76,8 @@ object SimpleConsumerPerformance {
       if(messagesRead == 0 || totalMessagesRead > config.numMessages)
         done = true
       else
-        offset += messages.validBytes
+        // we only did one fetch so we find the offset for the first (head) messageset
+        offset += messageSet.validBytes
       
       totalBytesRead += bytesRead
       totalMessagesRead += messagesRead
@@ -89,6 +97,7 @@ object SimpleConsumerPerformance {
         lastMessagesRead = totalMessagesRead
         consumedInterval = 0
       }
+      reqId += 1
     }
     val reportTime = System.currentTimeMillis
     val elapsed = (reportTime - startMs) / 1000.0
@@ -119,6 +128,11 @@ object SimpleConsumerPerformance {
                            .describedAs("bytes")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1024*1024)
+    val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
+                           .withOptionalArg
+                           .describedAs("clientId")
+                           .ofType(classOf[String])
+                           .defaultsTo("SimpleConsumerPerformanceClient")
 
     val options = parser.parse(args : _*)
 
@@ -139,5 +153,6 @@ object SimpleConsumerPerformance {
     val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
+    val clientId = options.valueOf(clientIdOpt).toString
   }
 }