You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/12 03:03:24 UTC
git commit: KAFKA-1233 Adding the new test file
Updated Branches:
refs/heads/trunk a0939f412 -> d6303ec79
KAFKA-1233 Adding the new test file
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6303ec7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6303ec7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6303ec7
Branch: refs/heads/trunk
Commit: d6303ec79e6871c901951e7dcde93d4996b52c49
Parents: a0939f4
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Feb 11 18:03:19 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 11 18:03:19 2014 -0800
----------------------------------------------------------------------
.../kafka/api/ProducerSendTest.scala | 306 +++++++++++++++++++
1 file changed, 306 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6303ec7/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
new file mode 100644
index 0000000..f8ba361
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{ZkUtils, Utils, TestUtils, Logging}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.AdminUtils
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequestBuilder
+import kafka.message.Message
+
+import org.apache.kafka.clients.producer._
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
+import org.junit.Assert._
+
+import java.util.Properties
+import java.lang.{Integer, IllegalArgumentException}
+import org.apache.log4j.Logger
+
+
+class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val ports = TestUtils.choosePorts(2)
+ private val (port1, port2) = (ports(0), ports(1))
+ private var server1: KafkaServer = null
+ private var server2: KafkaServer = null
+ private var servers = List.empty[KafkaServer]
+
+ private var consumer1: SimpleConsumer = null
+ private var consumer2: SimpleConsumer = null
+
+ private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ props1.put("num.partitions", "4")
+ props2.put("num.partitions", "4")
+ private val config1 = new KafkaConfig(props1)
+ private val config2 = new KafkaConfig(props2)
+
+ private val topic = "topic"
+ private val numRecords = 100
+
+ override def setUp() {
+ super.setUp()
+ // set up 2 brokers with 4 partitions each
+ server1 = TestUtils.createServer(config1)
+ server2 = TestUtils.createServer(config2)
+ servers = List(server1,server2)
+
+ // TODO: we need to migrate to new consumers when 0.9 is final
+ consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
+ consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
+ }
+
+ override def tearDown() {
+ server1.shutdown
+ server2.shutdown
+ Utils.rm(server1.config.logDirs)
+ Utils.rm(server2.config.logDirs)
+ super.tearDown()
+ }
+
+ class PrintOffsetCallback extends Callback {
+ def onCompletion(metadata: RecordMetadata, exception: Exception) {
+ if (exception != null)
+ fail("Send callback returns the following exception", exception)
+ try {
+ System.out.println("The message we just sent is marked as [" + metadata.partition + "] : " + metadata.offset);
+ } catch {
+ case e: Throwable => fail("Should succeed sending the message", e)
+ }
+ }
+ }
+
+ /**
+ * testSendOffset checks the basic send API behavior
+ *
+ * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
+ * 2. Last message of the non-blocking send should return the correct offset metadata
+ */
+ @Test
+ def testSendOffset() {
+ val props = new Properties()
+ props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = new KafkaProducer(props)
+
+ val callback = new PrintOffsetCallback
+
+ try {
+ // create topic
+ TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+ // send a normal record
+ val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
+ val response0 = producer.send(record0, callback)
+ assertEquals("Should have offset 0", 0L, response0.get.offset)
+
+ // send a record with null value should be ok
+ val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
+ val response1 = producer.send(record1, callback)
+ assertEquals("Should have offset 1", 1L, response1.get.offset)
+
+ // send a record with null key should be ok
+ val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
+ val response2 = producer.send(record2, callback)
+ assertEquals("Should have offset 2", 2L, response2.get.offset)
+
+ // send a record with null part id should be ok
+ val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+ val response3 = producer.send(record3, callback)
+ assertEquals("Should have offset 3", 3L, response3.get.offset)
+
+ // send a record with null topic should fail
+ try {
+ val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
+ val response4 = producer.send(record4, callback)
+ response4.wait
+ } catch {
+ case iae: IllegalArgumentException => // this is ok
+ case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+ }
+
+ // non-blocking send a list of records
+ for (i <- 1 to numRecords)
+ producer.send(record0)
+
+ // check that all messages have been acked via offset
+ val response5 = producer.send(record0, callback)
+ assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, response5.get.offset)
+
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ }
+
+ /**
+ * testClose checks the closing behavior
+ *
+ * 1. After close() returns, all messages should be sent with correct returned offset metadata
+ */
+ @Test
+ def testClose() {
+ val props = new Properties()
+ props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = new KafkaProducer(props)
+
+ try {
+ // create topic
+ TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+ // non-blocking send a list of records
+ val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+ for (i <- 1 to numRecords)
+ producer.send(record0)
+ val response0 = producer.send(record0)
+
+ // close the producer
+ producer.close()
+ producer = null
+
+ // check that all messages have been acked via offset,
+ // this also checks that messages with same key go to the same partition
+ assertTrue("The last message should be acked before producer is shutdown", response0.isDone)
+ assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
+
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ }
+
+ /**
+ * testSendToPartition checks the partitioning behavior
+ *
+ * 1. The default partitioner should have the correct round-robin behavior in assigning partitions
+ * 2. The specified partition-id should be respected
+ */
+ @Test
+ def testSendToPartition() {
+ val props = new Properties()
+ props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1")
+ var producer = new KafkaProducer(props)
+
+ try {
+ // create topic
+ val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+
+ // make sure leaders exist
+ val leader0 = leaders.get(0)
+ val leader1 = leaders.get(1)
+ assertTrue("Leader for topic new-topic partition 0 should exist", leader0.isDefined)
+ assertTrue("Leader for topic new-topic partition 1 should exist", leader1.isDefined)
+
+ // case 1: use default partitioner, send 2*numRecords+2 messages with no partition-id/keys,
+ // they should be assigned to two partitions evenly as (1,3,5,7..) and (2,4,6,8..)
+ for (i <- 1 to 2 * numRecords) {
+ val record = new ProducerRecord(topic, null, null, ("value" + i).getBytes)
+ producer.send(record)
+ }
+
+ // make sure both partitions have acked back
+ val record0 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 1)).getBytes)
+ val response0 = producer.send(record0);
+ assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
+ val record1 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 2)).getBytes)
+ val response1 = producer.send(record1);
+ assertEquals("Should have offset " + numRecords, numRecords.toLong, response1.get.offset)
+
+ // get messages from partition 0, and check they has numRecords+1 messages
+ val fetchResponse0 = if(leader0.get == server1.config.brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ }
+ val messageSet0 = fetchResponse0.messageSet(topic, 0).iterator.toBuffer
+ assertEquals("Should have fetched " + (numRecords + 1) + " messages", numRecords + 1, messageSet0.size)
+
+ // if the first message gets 1, make sure the rest are (3,5,7..);
+ // if the first message gets 2, make sure the rest are (4,6,8..)
+ val startWithOne = messageSet0(0).message.equals(new Message(bytes = "value1".getBytes))
+ for (i <- 1 to numRecords) {
+ if(startWithOne) {
+ assertEquals(new Message(bytes = ("value" + (i * 2 + 1)).getBytes), messageSet0(i).message)
+ } else {
+ assertEquals(new Message(bytes = ("value" + (i * 2 + 2)).getBytes), messageSet0(i).message)
+ }
+ }
+
+ // case 2: check the specified partition id is respected by sending numRecords with partition-id 1
+ // and make sure all of them end up in partition 1
+ for (i <- 1 to numRecords - 1) {
+ val record = new ProducerRecord(topic, new Integer(1), null, ("value" + i).getBytes)
+ producer.send(record)
+ }
+ val record2 = new ProducerRecord(topic, new Integer(1), null, ("value" + numRecords).getBytes)
+ val response2 = producer.send(record2);
+ assertEquals("Should have offset " + 2 * numRecords, (2 * numRecords).toLong, response2.get.offset)
+
+ // start fetching from offset numRecords+1
+ val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build())
+ }else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build())
+ }
+ val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer
+
+ assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
+
+ for (i <- 0 to numRecords - 1) {
+ assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
+ }
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ }
+
+ @Test
+ def testAutoCreateTopic() {
+ val props = new Properties()
+ props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+ var producer = new KafkaProducer(props)
+
+ try {
+ // Send a message to auto-create the topic
+ val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+ val response = producer.send(record)
+ assertEquals("Should have offset 0", 0L, response.get.offset)
+
+ // double check that the topic is created with leader elected
+ assertTrue("Topic should already be created with leader", TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 0).isDefined)
+
+ } finally {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
+ }
+ }
+}
\ No newline at end of file