You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/13 04:58:39 UTC
svn commit: r1243407 [2/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaapi/cons...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Mon Feb 13 03:58:37 2012
@@ -18,15 +18,15 @@
package kafka.javaapi.integration
import scala.collection._
-import kafka.api.FetchRequest
+import kafka.api.FetchRequestBuilder
import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
-import kafka.javaapi.message.ByteBufferMessageSet
import kafka.javaapi.ProducerRequest
+import kafka.javaapi.message.ByteBufferMessageSet
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
import kafka.utils.TestUtils
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
/**
* End to end tests of the primitive apis against a local server
@@ -43,39 +43,42 @@ class PrimitiveApiTest extends JUnit3Sui
// send some messages
val topic = "test"
-// send an empty messageset first
- val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(Seq.empty[Message]: _*))
+ // send an empty messageset first
+ val sent2 = new ByteBufferMessageSet(NoCompressionCodec, getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
+
Thread.sleep(200)
sent2.getBuffer.rewind
- var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
- TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+ val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ val fetchedMessage2 = fetched2.messageSet(topic, 0)
+ TestUtils.checkEquals(sent2.iterator, fetchedMessage2.iterator)
// send some messages
- val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = getMessageList(new Message("hello".getBytes()),
- new Message("there".getBytes())))
+ val sent3 = new ByteBufferMessageSet(NoCompressionCodec,
+ getMessageList(
+ new Message("hello".getBytes()),new Message("there".getBytes())))
producer.send(topic, sent3)
Thread.sleep(200)
sent3.getBuffer.rewind
- var fetched3: ByteBufferMessageSet = null
- while(fetched3 == null || fetched3.validBytes == 0)
- fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
- TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+ var messageSet: ByteBufferMessageSet = null
+ while(messageSet == null || messageSet.validBytes == 0) {
+ val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ messageSet = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
+ }
+ TestUtils.checkEquals(sent3.iterator, messageSet.iterator)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
// send an invalid offset
try {
- val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
- fetchedWithError.iterator
- fail("expect exception")
- }
- catch {
+ val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
+ val messageWithError = fetchedWithError.messageSet(topic, 0)
+ messageWithError.iterator
+ fail("Fetch with invalid offset should throw an exception when iterating over response")
+ } catch {
case e: OffsetOutOfRangeException => "this is good"
}
@@ -87,39 +90,42 @@ class PrimitiveApiTest extends JUnit3Sui
// send some messages
val topic = "test"
-// send an empty messageset first
- val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
- messages = getMessageList(Seq.empty[Message]: _*))
+ // send an empty messageset first
+ val sent2 = new ByteBufferMessageSet(DefaultCompressionCodec, getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
+
Thread.sleep(200)
sent2.getBuffer.rewind
- var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
- TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+ val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ val message2 = fetched2.messageSet(topic, 0)
+ TestUtils.checkEquals(sent2.iterator, message2.iterator)
// send some messages
- val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
- messages = getMessageList(new Message("hello".getBytes()),
- new Message("there".getBytes())))
+ val sent3 = new ByteBufferMessageSet( DefaultCompressionCodec,
+ getMessageList(
+ new Message("hello".getBytes()),new Message("there".getBytes())))
producer.send(topic, sent3)
Thread.sleep(200)
sent3.getBuffer.rewind
- var fetched3: ByteBufferMessageSet = null
- while(fetched3 == null || fetched3.validBytes == 0)
- fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
- TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+ var fetchedMessage: ByteBufferMessageSet = null
+ while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ fetchedMessage = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet]
+ }
+ TestUtils.checkEquals(sent3.iterator, fetchedMessage.iterator)
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
// send an invalid offset
try {
- val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
- fetchedWithError.iterator
- fail("expect exception")
- }
- catch {
+ val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
+ val messageWithError = fetchedWithError.messageSet(topic, 0)
+ messageWithError.iterator
+ fail("Fetch with invalid offset should throw an exception when iterating over response")
+ } catch {
case e: OffsetOutOfRangeException => "this is good"
}
@@ -129,31 +135,27 @@ class PrimitiveApiTest extends JUnit3Sui
def testProduceAndMultiFetch() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics) {
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
- fetches += new FetchRequest(topic, 0, 0, 10000)
+ builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = response.iterator
- for(topic <- topics) {
- if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
- fail("fewer responses than expected")
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val messageSet = response.messageSet(topic, partition)
+ TestUtils.checkEquals(messages(topic).iterator, messageSet.iterator)
}
}
@@ -162,37 +164,41 @@ class PrimitiveApiTest extends JUnit3Sui
{
// send some invalid offsets
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics)
- fetches += new FetchRequest(topic, 0, -1, 10000)
-
- try {
- val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = responses.iterator
- while (iter.hasNext)
- iter.next.iterator
- fail("expect exception")
- }
- catch {
- case e: OffsetOutOfRangeException => "this is good"
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics)
+ builder.addFetch(topic, partition, -1, 10000)
+
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ try {
+ val iter = response.messageSet(topic, partition).iterator
+ while (iter.hasNext)
+ iter.next
+ fail("MessageSet for invalid offset should throw exception")
+ } catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
}
}
{
// send some invalid partitions
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics)
- fetches += new FetchRequest(topic, -1, 0, 10000)
-
- try {
- val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = responses.iterator
- while (iter.hasNext)
- iter.next.iterator
- fail("expect exception")
- }
- catch {
- case e: InvalidPartitionException => "this is good"
+ val builder = new FetchRequestBuilder()
+ for( (topic, _) <- topics)
+ builder.addFetch(topic, -1, 0, 10000)
+
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, _) <- topics) {
+ try {
+ val iter = response.messageSet(topic, -1).iterator
+ while (iter.hasNext)
+ iter.next
+ fail("MessageSet for invalid partition should throw exception")
+ } catch {
+ case e: InvalidPartitionException => "this is good"
+ }
}
}
@@ -202,31 +208,31 @@ class PrimitiveApiTest extends JUnit3Sui
def testProduceAndMultiFetchWithCompression() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics) {
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
- fetches += new FetchRequest(topic, 0, 0, 10000)
+ builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = response.iterator
- for(topic <- topics) {
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
+ TestUtils.checkEquals(messages(topic).iterator, iter)
+ } else {
fail("fewer responses than expected")
+ }
}
}
@@ -235,37 +241,41 @@ class PrimitiveApiTest extends JUnit3Sui
{
// send some invalid offsets
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics)
- fetches += new FetchRequest(topic, 0, -1, 10000)
-
- try {
- val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = responses.iterator
- while (iter.hasNext)
- iter.next.iterator
- fail("expect exception")
- }
- catch {
- case e: OffsetOutOfRangeException => "this is good"
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics)
+ builder.addFetch(topic, partition, -1, 10000)
+
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ try {
+ val iter = response.messageSet(topic, partition).iterator
+ while (iter.hasNext)
+ iter.next
+ fail("Expected exception when fetching invalid offset")
+ } catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
}
}
{
// send some invalid partitions
- val fetches = new mutable.ArrayBuffer[FetchRequest]
- for(topic <- topics)
- fetches += new FetchRequest(topic, -1, 0, 10000)
-
- try {
- val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = responses.iterator
- while (iter.hasNext)
- iter.next.iterator
- fail("expect exception")
- }
- catch {
- case e: InvalidPartitionException => "this is good"
+ val builder = new FetchRequestBuilder()
+ for( (topic, _) <- topics)
+ builder.addFetch(topic, -1, 0, 10000)
+
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, _) <- topics) {
+ try {
+ val iter = response.messageSet(topic, -1).iterator
+ while (iter.hasNext)
+ iter.next
+ fail("Expected exception when fetching invalid partition")
+ } catch {
+ case e: InvalidPartitionException => "this is good"
+ }
}
}
@@ -275,79 +285,75 @@ class PrimitiveApiTest extends JUnit3Sui
def testProduceAndMultiFetchJava() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
- for(topic <- topics) {
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
- fetches.add(new FetchRequest(topic, 0, 0, 10000))
+ builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(fetches)
- val iter = response.iterator
- for(topic <- topics) {
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
+ TestUtils.checkEquals(messages(topic).iterator, iter)
+ } else {
fail("fewer responses than expected")
+ }
}
}
}
def testProduceAndMultiFetchJavaWithCompression() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
{
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
- for(topic <- topics) {
+ val builder = new FetchRequestBuilder()
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
- fetches.add(new FetchRequest(topic, 0, 0, 10000))
+ builder.addFetch(topic, partition, 0, 10000)
}
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(fetches)
- val iter = response.iterator
- for(topic <- topics) {
- if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
- fail("fewer responses than expected")
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val iter = response.messageSet(topic, partition).iterator
+ TestUtils.checkEquals(messages(topic).iterator, iter)
}
}
}
def testMultiProduce() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches = new mutable.ArrayBuffer[FetchRequest]
+ val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
- for(topic <- topics) {
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
- fetches += new FetchRequest(topic, 0, 0, 10000)
+ builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@@ -356,31 +362,31 @@ class PrimitiveApiTest extends JUnit3Sui
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = response.iterator
- for(topic <- topics) {
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
+ TestUtils.checkEquals(messages(topic).iterator, iter)
+ } else {
fail("fewer responses than expected")
+ }
}
}
def testMultiProduceWithCompression() {
// send some messages
- val topics = List("test1", "test2", "test3");
+ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, ByteBufferMessageSet]
- val fetches = new mutable.ArrayBuffer[FetchRequest]
+ val builder = new FetchRequestBuilder()
var produceList: List[ProducerRequest] = Nil
- for(topic <- topics) {
+ for( (topic, partition) <- topics) {
val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
messages = getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
- fetches += new FetchRequest(topic, 0, 0, 10000)
+ builder.addFetch(topic, partition, 0, 10000)
}
producer.multiSend(produceList.toArray)
@@ -389,15 +395,15 @@ class PrimitiveApiTest extends JUnit3Sui
// wait a bit for produced message to be available
Thread.sleep(200)
- val response = consumer.multifetch(getFetchRequestList(fetches: _*))
- val iter = response.iterator
- for(topic <- topics) {
+ val request = builder.build()
+ val response = consumer.fetch(request)
+ for( (topic, partition) <- topics) {
+ val iter = response.messageSet(topic, partition).iterator
if (iter.hasNext) {
- val resp = iter.next
- TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
- }
- else
+ TestUtils.checkEquals(messages(topic).iterator, iter)
+ } else {
fail("fewer responses than expected")
+ }
}
}
@@ -406,10 +412,4 @@ class PrimitiveApiTest extends JUnit3Sui
messages.foreach(m => messageList.add(m))
messageList
}
-
- private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = {
- val fetchReqs = new java.util.ArrayList[FetchRequest]()
- fetches.foreach(f => fetchReqs.add(f))
- fetchReqs
- }
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Mon Feb 13 03:58:37 2012
@@ -22,7 +22,6 @@ import kafka.utils._
import kafka.server.{KafkaConfig, KafkaServer}
import junit.framework.Assert._
import java.util.{Random, Properties}
-import kafka.api.{FetchRequest, OffsetRequest}
import collection.mutable.WrappedArray
import kafka.consumer.SimpleConsumer
import org.junit.{After, Before, Test}
@@ -30,6 +29,7 @@ import kafka.message.{NoCompressionCodec
import org.apache.log4j._
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
object LogOffsetTest {
val random = new Random()
@@ -66,9 +66,8 @@ class LogOffsetTest extends JUnit3Suite
@Test
def testEmptyLogs() {
- val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
- new FetchRequest("test", 0, 0, 300 * 1024))
- assertFalse(messageSet.iterator.hasNext)
+ val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
+ assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
val name = "test"
val logFile = new File(logDir, name + "-0")
@@ -119,9 +118,9 @@ class LogOffsetTest extends JUnit3Suite
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
// try to fetch using latest offset
- val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
- new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
- assertFalse(messageSet.iterator.hasNext)
+ val fetchResponse = simpleConsumer.fetch(
+ new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
+ assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
}
@Test
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Mon Feb 13 03:58:37 2012
@@ -17,23 +17,21 @@
package kafka.log4j
-import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{PropertyConfigurator, Logger}
import java.util.Properties
import java.io.File
-import kafka.consumer.SimpleConsumer
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestZKUtils
-import kafka.zk.EmbeddedZookeeper
import junit.framework.Assert._
-import kafka.api.FetchRequest
-import kafka.serializer.Encoder
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.producer.async.MissingConfigException
+import kafka.serializer.Encoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
+import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{PropertyConfigurator, Logger}
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, Utils, Logging}
class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@@ -172,10 +170,10 @@ class KafkaLog4jAppenderTest extends JUn
Thread.sleep(2500)
var offset = 0L
- val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
-
+ val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build())
+ val fetchedMessage = response.messageSet("test-topic", 0)
var count = 0
- for(message <- messages) {
+ for(message <- fetchedMessage) {
count = count + 1
offset += message.offset
}
@@ -192,14 +190,16 @@ class KafkaLog4jAppenderTest extends JUn
Thread.sleep(500)
- val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+ val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
+ val fetchMessage = response.messageSet("test-topic", 0)
var count = 0
- for(message <- messages) {
+ for(message <- fetchMessage) {
count = count + 1
}
- val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024))
+ val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build())
+ val messagesFromOtherBroker = response2.messageSet("test-topic", 0)
for(message <- messagesFromOtherBroker) {
count = count + 1
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Feb 13 03:58:37 2012
@@ -17,18 +17,18 @@
package kafka.producer
-import org.apache.log4j.{Logger, Level}
-import kafka.zk.EmbeddedZookeeper
-import org.junit.{After, Before, Test}
import junit.framework.Assert._
-import org.scalatest.junit.JUnitSuite
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.api.FetchRequest
+import java.util.Properties
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
import kafka.message.Message
import kafka.serializer.Encoder
-import kafka.consumer.SimpleConsumer
-import java.util.Properties
import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
+import kafka.utils.{TestUtils, TestZKUtils, Utils}
+import kafka.zk.EmbeddedZookeeper
+import org.apache.log4j.{Logger, Level}
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
class ProducerTest extends JUnitSuite {
private val topic = "test-topic"
@@ -106,12 +106,14 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
- val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet1.next.message)
- val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- assertTrue("Message set should have 1 message", messageSet2.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet1 = response1.messageSet("new-topic", 0)
+ assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1.head.message)
+ val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet2 = response2.messageSet("new-topic", 0)
+ assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet2.head.message)
} catch {
case e: Exception => fail("Not expected", e)
}
@@ -142,11 +144,12 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
Thread.sleep(100)
// cross check if brokers got the messages
- val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet1.next.message)
- assertTrue("Message set should have another message", messageSet1.hasNext)
- assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet1Iter = response1.messageSet("new-topic", 0).iterator
+ assertTrue("Message set should have 1 message", messageSet1Iter.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
+ assertTrue("Message set should have another message", messageSet1Iter.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message)
} catch {
case e: Exception => fail("Not expected")
}
@@ -174,9 +177,10 @@ class ProducerTest extends JUnitSuite {
producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
Thread.sleep(100)
// cross check if brokers got the messages
- val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test".getBytes), messageSet1.next.message)
+ val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet1 = response1.messageSet("new-topic", 0)
+ assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext)
+ assertEquals(new Message("test".getBytes), messageSet1.head.message)
// shutdown server2
server2.shutdown
@@ -197,9 +201,10 @@ class ProducerTest extends JUnitSuite {
Thread.sleep(100)
// cross check if brokers got the messages
- val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- assertTrue("Message set should have 1 message", messageSet2.hasNext)
- assertEquals(new Message("test".getBytes), messageSet2.next.message)
+ val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+ val messageSet2 = response2.messageSet("new-topic", 0)
+ assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext)
+ assertEquals(new Message("test".getBytes), messageSet2.head.message)
} catch {
case e: Exception => fail("Not expected", e)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Mon Feb 13 03:58:37 2012
@@ -17,7 +17,6 @@
package kafka.server
import java.io.File
-import kafka.api.FetchRequest
import kafka.producer.{SyncProducer, SyncProducerConfig}
import kafka.consumer.SimpleConsumer
import java.util.Properties
@@ -27,6 +26,7 @@ import kafka.message.{NoCompressionCodec
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
+import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest}
class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
val port = TestUtils.choosePort
@@ -82,11 +82,13 @@ class ServerShutdownTest extends JUnit3S
server.startup()
// bring the server back again and read the messages
- var fetched: ByteBufferMessageSet = null
- while(fetched == null || fetched.validBytes == 0)
- fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
- TestUtils.checkEquals(sent1.iterator, fetched.iterator)
- val newOffset = fetched.validBytes
+ var fetchedMessage: ByteBufferMessageSet = null
+ while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ fetchedMessage = fetched.messageSet(topic, 0)
+ }
+ TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator)
+ val newOffset = fetchedMessage.validBytes
// send some more messages
producer.send(topic, sent2)
@@ -94,10 +96,12 @@ class ServerShutdownTest extends JUnit3S
Thread.sleep(200)
- fetched = null
- while(fetched == null || fetched.validBytes == 0)
- fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
- TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)
+ fetchedMessage = null
+ while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
+ fetchedMessage = fetched.messageSet(topic, 0)
+ }
+ TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m => m.message).iterator)
server.shutdown()
Utils.rm(server.config.logDir)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Feb 13 03:58:37 2012
@@ -186,23 +186,21 @@ object TestUtils {
length += 1
assertEquals(expected.next, actual.next)
}
-
- if (expected.hasNext)
- {
+
+ // check if the expected iterator is longer
+ if (expected.hasNext) {
var length1 = length;
- while (expected.hasNext)
- {
+ while (expected.hasNext) {
expected.next
length1 += 1
}
assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
}
-
- if (actual.hasNext)
- {
+
+ // check if the actual iterator was longer
+ if (actual.hasNext) {
var length2 = length;
- while (actual.hasNext)
- {
+ while (actual.hasNext) {
actual.next
length2 += 1
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Mon Feb 13 03:58:37 2012
@@ -77,7 +77,7 @@ class ZKLoadBalanceTest extends JUnit3Su
// wait a bit to make sure rebalancing logic is triggered
- Thread.sleep(1000)
+ Thread.sleep(1500)
// check Partition Owner Registry
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_3 = List( ("200-0", "group1_consumer1-0"),
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/KafkaProperties.java Mon Feb 13 03:58:37 2012
@@ -28,4 +28,5 @@ public interface KafkaProperties
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
+ final static String clientId = "SimpleConsumerDemoClient";
}
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Mon Feb 13 03:58:37 2012
@@ -16,71 +16,76 @@
*/
package kafka.examples;
-import java.util.ArrayList;
-import java.util.List;
-
-import kafka.javaapi.MultiFetchResponse;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.javaapi.message.MessageSet;
import kafka.message.MessageAndOffset;
-import scala.collection.Iterator;
-import kafka.api.FetchRequest;
-import kafka.message.Message;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-public class SimpleConsumerDemo
-{
- private static void printMessages(ByteBufferMessageSet messageSet)
- {
+public class SimpleConsumerDemo {
+
+ private static void printMessages(ByteBufferMessageSet messageSet) {
for (MessageAndOffset messageAndOffset : messageSet) {
System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
}
}
- private static void generateData()
- {
+ private static void generateData() {
Producer producer2 = new Producer(KafkaProperties.topic2);
producer2.start();
Producer producer3 = new Producer(KafkaProperties.topic3);
producer3.start();
- try
- {
+ try {
Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
+ } catch (InterruptedException e) {
e.printStackTrace();
}
}
- public static void main(String[] args)
- {
-
+ public static void main(String[] args) {
generateData();
+
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
KafkaProperties.kafkaServerPort,
KafkaProperties.connectionTimeOut,
KafkaProperties.kafkaProducerBufferSize);
System.out.println("Testing single fetch");
- FetchRequest req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
- ByteBufferMessageSet messageSet = simpleConsumer.fetch(req);
- printMessages(messageSet);
+ FetchRequest req = new FetchRequestBuilder()
+ .correlationId(0)
+ .clientId(KafkaProperties.clientId)
+ .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+ .build();
+ FetchResponse fetchResponse = simpleConsumer.fetch(req);
+ printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
System.out.println("Testing single multi-fetch");
- req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100);
- List<FetchRequest> list = new ArrayList<FetchRequest>();
- list.add(req);
- req = new FetchRequest(KafkaProperties.topic3, 0, 0L, 100);
- list.add(req);
- MultiFetchResponse response = simpleConsumer.multifetch(list);
+ Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{
+ put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }});
+ put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
+ }};
+ req = new FetchRequestBuilder()
+ .correlationId(0)
+ .clientId(KafkaProperties.clientId)
+ .addFetch(KafkaProperties.topic2, 0, 0L, 100)
+ .addFetch(KafkaProperties.topic3, 0, 0L, 100)
+ .build();
+ fetchResponse = simpleConsumer.fetch(req);
int fetchReq = 0;
- for (ByteBufferMessageSet resMessageSet : response )
- {
- System.out.println("Response from fetch request no: " + ++fetchReq);
- printMessages(resMessageSet);
+ for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
+ String topic = entry.getKey();
+ for ( Integer offset : entry.getValue()) {
+ System.out.println("Response from fetch request no: " + ++fetchReq);
+ printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
+ }
}
}
-
}
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1243407&r1=1243406&r2=1243407&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Mon Feb 13 03:58:37 2012
@@ -18,13 +18,12 @@
package kafka.perf
import java.net.URI
-import joptsimple._
-import kafka.utils._
-import kafka.server._
+import java.text.SimpleDateFormat
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
import kafka.consumer.SimpleConsumer
+import kafka.utils._
import org.apache.log4j.Logger
-import kafka.api.{OffsetRequest, FetchRequest}
-import java.text.SimpleDateFormat
+import kafka.message.ByteBufferMessageSet
/**
* Performance test for the simple consumer
@@ -56,12 +55,20 @@ object SimpleConsumerPerformance {
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
+ var reqId = 0
while(!done) {
- val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize))
+ // TODO: add in the maxWait and minBytes for performance
+ val request = new FetchRequestBuilder()
+ .correlationId(reqId)
+ .clientId(config.clientId)
+ .addFetch(config.topic, config.partition, offset, config.fetchSize)
+ .build()
+ val fetchResponse = consumer.fetch(request)
+
var messagesRead = 0
var bytesRead = 0
-
- for(message <- messages) {
+ val messageSet = fetchResponse.messageSet(config.topic, config.partition)
+ for (message <- messageSet) {
messagesRead += 1
bytesRead += message.message.payloadSize
}
@@ -69,7 +76,8 @@ object SimpleConsumerPerformance {
if(messagesRead == 0 || totalMessagesRead > config.numMessages)
done = true
else
- offset += messages.validBytes
+ // we only did one fetch so we find the offset for the first (head) messageset
+ offset += messageSet.validBytes
totalBytesRead += bytesRead
totalMessagesRead += messagesRead
@@ -89,6 +97,7 @@ object SimpleConsumerPerformance {
lastMessagesRead = totalMessagesRead
consumedInterval = 0
}
+ reqId += 1
}
val reportTime = System.currentTimeMillis
val elapsed = (reportTime - startMs) / 1000.0
@@ -119,6 +128,11 @@ object SimpleConsumerPerformance {
.describedAs("bytes")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024*1024)
+ val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
+ .withOptionalArg
+ .describedAs("clientId")
+ .ofType(classOf[String])
+ .defaultsTo("SimpleConsumerPerformanceClient")
val options = parser.parse(args : _*)
@@ -139,5 +153,6 @@ object SimpleConsumerPerformance {
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
+ val clientId = options.valueOf(clientIdOpt).toString
}
}