You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/17 20:43:55 UTC

git commit: Liars in PrimitiveApiTest that promise to test api in compression mode, but don't do this actually; patched by Kostya Golikov; reviewed by Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk dc5a993e6 -> f550cc76c


Liars in PrimitiveApiTest that promise to test api in compression mode, but don't do this actually; patched by Kostya Golikov; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f550cc76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f550cc76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f550cc76

Branch: refs/heads/trunk
Commit: f550cc76cd4aa6a1ed771c477ed8cc832520ba1a
Parents: dc5a993
Author: Kostya Golikov <jo...@gmail.com>
Authored: Mon Feb 17 11:43:45 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 17 11:43:45 2014 -0800

----------------------------------------------------------------------
 .../kafka/integration/PrimitiveApiTest.scala    | 140 ++++++-------------
 1 file changed, 39 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f550cc76/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 5f331d2..d44c3ff 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils}
  * End to end tests of the primitive apis against a local server
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  val port = TestUtils.choosePort
+  val port = TestUtils.choosePort()
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
   val configs = List(config)
-  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
@@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     val stringProducer1 = new Producer[String, String](config)
     stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
 
-    var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
+    val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
     val messageSet = fetched.messageSet(topic, 0)
     assertTrue(messageSet.iterator.hasNext)
 
@@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
-  def testProduceAndMultiFetch() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+  private def produceAndMultiFetch(producer: Producer[String, String]) {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"))
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def testProduceAndMultiFetchWithCompression() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
-
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    {
-      val messages = new mutable.HashMap[String, Seq[String]]
-      val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics) {
-        val messageList = List("a_" + topic, "b_" + topic)
-        val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
-        messages += topic -> messageList
-        producer.send(producerData:_*)
-        builder.addFetch(topic, partition, 0, 10000)
-      }
-
-      // wait a bit for produced message to be available
-      val request = builder.build()
-      val response = consumer.fetch(request)
-      for( (topic, partition) <- topics) {
-        val fetched = response.messageSet(topic, partition)
-        assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
-      }
-    }
-
-    // temporarily set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    {
-      // send some invalid offsets
-      val builder = new FetchRequestBuilder()
-      for( (topic, partition) <- topics)
-        builder.addFetch(topic, partition, -1, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
-        fail("Expected exception when fetching message with invalid offset")
-      } catch {
-        case e: OffsetOutOfRangeException => "this is good"
-      }
-    }
-
-    {
-      // send some invalid partitions
-      val builder = new FetchRequestBuilder()
-      for( (topic, _) <- topics)
-        builder.addFetch(topic, -1, 0, 10000)
-
-      try {
-        val request = builder.build()
-        val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
-        fail("Expected exception when fetching message with invalid partition")
-      } catch {
-        case e: UnknownTopicOrPartitionException => "this is good"
-      }
-    }
+  def testProduceAndMultiFetch() {
+    val props = producer.config.props.props
+    val config = new ProducerConfig(props)
+    val noCompressionProducer = new Producer[String, String](config)
+    produceAndMultiFetch(noCompressionProducer)
+  }
 
-    // restore set request handler logger to a higher level
-    requestHandlerLogger.setLevel(Level.ERROR)
+  def testProduceAndMultiFetchWithCompression() {
+    val props = producer.config.props.props
+    props.put("compression", "true")
+    val config = new ProducerConfig(props)
+    val producerWithCompression = new Producer[String, String](config)
+    produceAndMultiFetch(producerWithCompression)
   }
 
-  def testMultiProduce() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+  private def multiProduce(producer: Producer[String, String]) {
+    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
 
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for( (topic, partition) <- topics) {
+    for((topic, partition) <- topics) {
       val messageList = List("a_" + topic, "b_" + topic)
       val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
       messages += topic -> messageList
       producer.send(producerData:_*)
       builder.addFetch(topic, partition, 0, 10000)
     }
-    producer.send(produceList: _*)
 
     val request = builder.build()
     val response = consumer.fetch(request)
-    for( (topic, partition) <- topics) {
+    for((topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
       assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
     }
   }
 
-  def testMultiProduceWithCompression() {
-    // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, Seq[String]]
-    val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
-    for( (topic, partition) <- topics) {
-      val messageList = List("a_" + topic, "b_" + topic)
-      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
-      messages += topic -> messageList
-      producer.send(producerData:_*)
-      builder.addFetch(topic, partition, 0, 10000)
-    }
-    producer.send(produceList: _*)
+  def testMultiProduce() {
+    val props = producer.config.props.props
+    val config = new ProducerConfig(props)
+    val noCompressionProducer = new Producer[String, String](config)
+    multiProduce(noCompressionProducer)
+  }
 
-    // wait a bit for produced message to be available
-    val request = builder.build()
-    val response = consumer.fetch(request)
-    for( (topic, partition) <- topics) {
-      val fetched = response.messageSet(topic, 0)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
-    }
+  def testMultiProduceWithCompression() {
+    val props = producer.config.props.props
+    props.put("compression", "true")
+    val config = new ProducerConfig(props)
+    val producerWithCompression = new Producer[String, String](config)
+    multiProduce(producerWithCompression)
   }
 
   def testConsumerEmptyTopic() {
@@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
   }
 
   def testPipelinedProduceRequests() {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId)
+    val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
     val props = producer.config.props.props
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
 
     // send some messages
-    val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[KeyedMessage[String, String]] = Nil
     for( (topic, partition) <- topics) {
       val messageList = List("a_" + topic, "b_" + topic)
       val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
@@ -338,10 +276,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
    * For testing purposes, just create these topics each with one partition and one replica for
    * which the provided broker should the leader for.  Create and wait for broker to lead.  Simple.
    */
-  def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
+  private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) {
     for( topic <- topics ) {
-      AdminUtils.createTopic(zkClient, topic, 1, 1)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+      AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0, timeoutMs = 500)
     }
   }
 }