You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/17 20:43:55 UTC
git commit: Liars in PrimitiveApiTest that promise to test api in
compression mode, but don't do this actually; patched by Kostya Golikov;
reviewed by Guozhang Wang and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk dc5a993e6 -> f550cc76c
Liars in PrimitiveApiTest that promise to test api in compression mode, but don't do this actually; patched by Kostya Golikov; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f550cc76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f550cc76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f550cc76
Branch: refs/heads/trunk
Commit: f550cc76cd4aa6a1ed771c477ed8cc832520ba1a
Parents: dc5a993
Author: Kostya Golikov <jo...@gmail.com>
Authored: Mon Feb 17 11:43:45 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 17 11:43:45 2014 -0800
----------------------------------------------------------------------
.../kafka/integration/PrimitiveApiTest.scala | 140 ++++++-------------
1 file changed, 39 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f550cc76/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 5f331d2..d44c3ff 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils}
* End to end tests of the primitive apis against a local server
*/
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- val port = TestUtils.choosePort
+ val port = TestUtils.choosePort()
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props)
val configs = List(config)
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
@@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
- var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
@@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
- def testProduceAndMultiFetch() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+ private def produceAndMultiFetch(producer: Producer[String, String]) {
+ createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"))
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
requestHandlerLogger.setLevel(Level.ERROR)
}
- def testProduceAndMultiFetchWithCompression() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
-
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- {
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
-
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- val fetched = response.messageSet(topic, partition)
- assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
- }
- }
-
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
-
- {
- // send some invalid offsets
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics)
- builder.addFetch(topic, partition, -1, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected exception when fetching message with invalid offset")
- } catch {
- case e: OffsetOutOfRangeException => "this is good"
- }
- }
-
- {
- // send some invalid partitions
- val builder = new FetchRequestBuilder()
- for( (topic, _) <- topics)
- builder.addFetch(topic, -1, 0, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected exception when fetching message with invalid partition")
- } catch {
- case e: UnknownTopicOrPartitionException => "this is good"
- }
- }
+ def testProduceAndMultiFetch() {
+ val props = producer.config.props.props
+ val config = new ProducerConfig(props)
+ val noCompressionProducer = new Producer[String, String](config)
+ produceAndMultiFetch(noCompressionProducer)
+ }
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
+ def testProduceAndMultiFetchWithCompression() {
+ val props = producer.config.props.props
+ props.put("compression", "true")
+ val config = new ProducerConfig(props)
+ val producerWithCompression = new Producer[String, String](config)
+ produceAndMultiFetch(producerWithCompression)
}
- def testMultiProduce() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+ private def multiProduce(producer: Producer[String, String]) {
+ val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+ createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for( (topic, partition) <- topics) {
+ for((topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
- producer.send(produceList: _*)
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
+ for((topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
- def testMultiProduceWithCompression() {
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
- producer.send(produceList: _*)
+ def testMultiProduce() {
+ val props = producer.config.props.props
+ val config = new ProducerConfig(props)
+ val noCompressionProducer = new Producer[String, String](config)
+ multiProduce(noCompressionProducer)
+ }
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- val fetched = response.messageSet(topic, 0)
- assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
- }
+ def testMultiProduceWithCompression() {
+ val props = producer.config.props.props
+ props.put("compression", "true")
+ val config = new ProducerConfig(props)
+ val producerWithCompression = new Producer[String, String](config)
+ multiProduce(producerWithCompression)
}
def testConsumerEmptyTopic() {
@@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
def testPipelinedProduceRequests() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+ val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+ createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
val props = producer.config.props.props
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
// send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
@@ -338,10 +276,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
* For testing purposes, just create these topics each with one partition and one replica for
* which the provided broker should the leader for. Create and wait for broker to lead. Simple.
*/
- def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
+ private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) {
for( topic <- topics ) {
- AdminUtils.createTopic(zkClient, topic, 1, 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0, timeoutMs = 500)
}
}
}