You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/08 03:59:55 UTC
svn commit: r1241754 [2/2] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/common/ main/scala/kafka/javaapi/
main/scala/kafka/javaapi/producer/ main/scala/kafka/javaapi/producer/async/
main/scala/kafka/producer/ main/scala/kafka/producer/...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Feb 8 02:59:54 2012
@@ -17,287 +17,385 @@
package kafka.producer
-import junit.framework.Assert
-import java.util.Properties
import org.easymock.EasyMock
import kafka.api.ProducerRequest
-import org.apache.log4j.{Logger, Level}
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import kafka.producer.async._
-import kafka.serializer.Encoder
+import java.util.concurrent.LinkedBlockingQueue
+import junit.framework.Assert._
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+import collection.mutable.{HashMap, ListBuffer}
+import collection.Map
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
+import java.util.{LinkedList, Properties}
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
class AsyncProducerTest extends JUnitSuite {
- private val messageContent1 = "test"
- private val topic1 = "test-topic"
- private val message1: Message = new Message(messageContent1.getBytes)
-
- private val messageContent2 = "test1"
- private val topic2 = "test1$topic"
- private val message2: Message = new Message(messageContent2.getBytes)
- val asyncProducerLogger = Logger.getLogger(classOf[AsyncProducer[String]])
-
@Test
def testProducerQueueSize() {
- val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 10)))))
- EasyMock.expectLastCall
- basicProducer.close
- EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
+ // a mock event handler that blocks
+ val mockEventHandler = new EventHandler[String,String] {
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "10")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- val config = new AsyncProducerConfig(props)
+ def handle(events: Seq[ProducerData[String,String]]) {
+ Thread.sleep(1000000)
+ }
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+ def close {}
+ }
- //temporarily set log4j to a higher level to avoid error in the output
- producer.setLoggerLevel(Level.FATAL)
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9092")
+ props.put("producer.type", "async")
+ props.put("queue.size", "10")
+ props.put("batch.size", "1")
+ val config = new ProducerConfig(props)
+ val produceData = getProduceData(12)
+ val producer = new Producer[String, String](config, mockEventHandler)
try {
- for(i <- 0 until 11) {
- producer.send(messageContent1 + "-topic", messageContent1)
- }
- Assert.fail("Queue should be full")
+ // send all 10 messages, should hit the batch size and then reach broker
+ producer.send(produceData: _*)
+ fail("Queue should be full")
}
catch {
- case e: QueueFullException => println("Queue is full..")
+ case e: QueueFullException => //expected
}
- producer.start
- producer.close
- Thread.sleep(2000)
- EasyMock.verify(basicProducer)
- producer.setLoggerLevel(Level.ERROR)
}
@Test
- def testAddAfterQueueClosed() {
- val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 10)))))
- EasyMock.expectLastCall
- basicProducer.close
- EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
-
+ def testProduceAfterClosed() {
val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "10")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- val config = new AsyncProducerConfig(props)
-
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
- producer.start
- for(i <- 0 until 10) {
- producer.send(messageContent1 + "-topic", messageContent1)
- }
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9092")
+ props.put("producer.type", "async")
+ props.put("batch.size", "1")
+
+ val config = new ProducerConfig(props)
+ val produceData = getProduceData(10)
+ val producer = new Producer[String, String](config)
producer.close
try {
- producer.send(messageContent1 + "-topic", messageContent1)
- Assert.fail("Queue should be closed")
- } catch {
- case e: QueueClosedException =>
+ producer.send(produceData: _*)
+ fail("should complain that producer is already closed")
+ }
+ catch {
+ case e: ProducerClosedException => //expected
}
- EasyMock.verify(basicProducer)
+ }
+
+ def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
+ val producerDataList = new ListBuffer[ProducerData[String,String]]
+ for (i <- 0 until nEvents)
+ producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
+ producerDataList
}
@Test
def testBatchSize() {
- val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 5)))))
- EasyMock.expectLastCall.times(2)
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 1)))))
- EasyMock.expectLastCall
- basicProducer.close
- EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
-
- val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "10")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("batch.size", "5")
+ /**
+ * Send a total of 10 messages with batch size of 5. Expect 2 calls to the handler, one for each batch.
+ */
+ val producerDataList = getProduceData(10)
+ val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
+ mockHandler.handle(producerDataList.take(5))
+ EasyMock.expectLastCall
+ mockHandler.handle(producerDataList.takeRight(5))
+ EasyMock.expectLastCall
+ EasyMock.replay(mockHandler)
+
+ val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+ val producerSendThread =
+ new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5)
+ producerSendThread.start()
- val config = new AsyncProducerConfig(props)
+ for (producerData <- producerDataList)
+ queue.put(producerData)
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+ producerSendThread.shutdown
+ EasyMock.verify(mockHandler)
+ }
- producer.start
- for(i <- 0 until 10) {
- producer.send(messageContent1 + "-topic", messageContent1)
- }
+ @Test
+ def testQueueTimeExpired() {
+ /**
+ * Send a total of 2 messages with batch size of 5 and queue time of 200ms.
+ * Expect 1 calls to the handler after 200ms.
+ */
+ val producerDataList = getProduceData(2)
+ val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
+ mockHandler.handle(producerDataList)
+ EasyMock.expectLastCall
+ EasyMock.replay(mockHandler)
+
+ val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+ val producerSendThread =
+ new ProducerSendThread[String,String]("thread1", queue, mockHandler, 200, 5)
+ producerSendThread.start()
- Thread.sleep(100)
- try {
- producer.send(messageContent1 + "-topic", messageContent1)
- } catch {
- case e: QueueFullException =>
- Assert.fail("Queue should not be full")
- }
+ for (producerData <- producerDataList)
+ queue.put(producerData)
- producer.close
- EasyMock.verify(basicProducer)
+ Thread.sleep(300)
+ producerSendThread.shutdown
+ EasyMock.verify(mockHandler)
}
@Test
- def testQueueTimeExpired() {
- val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 3)))))
- EasyMock.expectLastCall
- basicProducer.close
- EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
+ def testPartitionAndCollateEvents() {
+ val producerDataList = new ListBuffer[ProducerData[Int,Message]]
+ producerDataList.append(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
+ producerDataList.append(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
+ producerDataList.append(new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
+ producerDataList.append(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
+ producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "10")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("queue.time", "200")
+ props.put("broker.list", "0:localhost:9092,1:localhost:9092")
- val config = new AsyncProducerConfig(props)
-
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
- val serializer = new StringSerializer
-
- producer.start
- for(i <- 0 until 3) {
- producer.send(serializer.getTopic(messageContent1), messageContent1, ProducerRequest.RandomPartition)
+ val intPartitioner = new Partitioner[Int] {
+ def partition(key: Int, numPartitions: Int): Int = key % numPartitions
}
+ val config = new ProducerConfig(props)
+ val handler = new DefaultEventHandler[Int,String](config,
+ partitioner = intPartitioner,
+ encoder = null.asInstanceOf[Encoder[String]],
+ producerPool = null,
+ populateProducerPool = false,
+ brokerPartitionInfo = null)
+
+ val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
+ topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
+ new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))))
+ val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]]
+ topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))))
+ val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]]
+ topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
+ val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
+ topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
+ val expectedResult = Map(
+ 0 -> Map(
+ ("topic1", -1) -> topic1Broker1Data,
+ ("topic2", -1) -> topic2Broker1Data),
+ 1 -> Map(
+ ("topic1", -1) -> topic1Broker2Data,
+ ("topic2", -1) -> topic2Broker2Data)
+ )
- Thread.sleep(300)
- producer.close
- EasyMock.verify(basicProducer)
+ val actualResult = handler.partitionAndCollate(producerDataList)
+ assertEquals(expectedResult, actualResult)
}
@Test
- def testSenderThreadShutdown() {
- val syncProducerProps = new Properties()
- syncProducerProps.put("host", "localhost")
- syncProducerProps.put("port", "9092")
- syncProducerProps.put("buffer.size", "1000")
- syncProducerProps.put("connect.timeout.ms", "1000")
- syncProducerProps.put("reconnect.interval", "1000")
- val basicProducer = new MockProducer(new SyncProducerConfig(syncProducerProps))
-
- val asyncProducerProps = new Properties()
- asyncProducerProps.put("host", "localhost")
- asyncProducerProps.put("port", "9092")
- asyncProducerProps.put("queue.size", "10")
- asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
- asyncProducerProps.put("queue.time", "100")
-
- val config = new AsyncProducerConfig(asyncProducerProps)
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
- producer.start
- producer.send(messageContent1 + "-topic", messageContent1)
- producer.close
+ def testSerializeEvents() {
+ val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
+ val props = new Properties()
+ props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+ val config = new ProducerConfig(props)
+ val handler = new DefaultEventHandler[String,String](config,
+ partitioner = null.asInstanceOf[Partitioner[String]],
+ encoder = new StringEncoder,
+ producerPool = null,
+ populateProducerPool = false,
+ brokerPartitionInfo = null)
+
+ val serializedData = handler.serialize(produceData)
+ val decoder = new StringDecoder
+ val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m))))
+ TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
}
@Test
- def testCollateEvents() {
- val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message2), 5)),
- new ProducerRequest(topic1, ProducerRequest.RandomPartition,
- getMessageSetOfSize(List(message1), 5)))))
- EasyMock.expectLastCall
- basicProducer.close
- EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
+ def testInvalidPartition() {
+ val producerDataList = new ListBuffer[ProducerData[String,Message]]
+ producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
+ val props = new Properties()
+ props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+ val config = new ProducerConfig(props)
+ val handler = new DefaultEventHandler[String,String](config,
+ partitioner = new NegativePartitioner,
+ encoder = null.asInstanceOf[Encoder[String]],
+ producerPool = null,
+ populateProducerPool = false,
+ brokerPartitionInfo = null)
+ try {
+ handler.partitionAndCollate(producerDataList)
+ fail("Should fail with InvalidPartitionException")
+ }
+ catch {
+ case e: InvalidPartitionException => // expected, do nothing
+ }
+ }
+
+ private def getMockBrokerPartitionInfo(): BrokerPartitionInfo ={
+ new BrokerPartitionInfo {
+ def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition] = SortedSet.empty[Partition]
+
+ def getBrokerInfo(brokerId: Int): Option[Broker] = None
+
+ def getAllBrokerInfo: Map[Int, Broker] = new HashMap[Int, Broker]
+
+ def updateInfo = {}
+ def close = {}
+ }
+ }
+
+ @Test
+ def testNoBroker() {
+ val producerDataList = new ListBuffer[ProducerData[String,String]]
+ producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "50")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("batch.size", "10")
-
- val config = new AsyncProducerConfig(props)
-
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
- producer.start
- val serializer = new StringSerializer
- for(i <- 0 until 5) {
- producer.send(messageContent1 + "-topic", messageContent1)
- producer.send(messageContent2 + "$topic", messageContent2, ProducerRequest.RandomPartition)
+ val config = new ProducerConfig(props)
+ val handler = new DefaultEventHandler[String,String](config,
+ partitioner = null.asInstanceOf[Partitioner[String]],
+ encoder = new StringEncoder,
+ producerPool = null,
+ populateProducerPool = false,
+ brokerPartitionInfo = getMockBrokerPartitionInfo)
+ try {
+ handler.handle(producerDataList)
+ fail("Should fail with NoBrokersForPartitionException")
+ }
+ catch {
+ case e: NoBrokersForPartitionException => // expected, do nothing
}
+ }
- producer.close
- EasyMock.verify(basicProducer)
+ @Test
+ def testIncompatibleEncoder() {
+ val props = new Properties()
+ props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+ val config = new ProducerConfig(props)
+ val producer=new Producer[String, String](config)
+ try {
+ producer.send(getProduceData(1): _*)
+ fail("Should fail with ClassCastException due to incompatible Encoder")
+ } catch {
+ case e: ClassCastException =>
+ }
}
@Test
- def testCollateAndSerializeEvents() {
- val basicProducer = EasyMock.createMock(classOf[SyncProducer])
- basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
- getMessageSetOfSize(List(message2), 5)),
- new ProducerRequest(topic1, 0,
- getMessageSetOfSize(List(message1), 5)),
- new ProducerRequest(topic1, 1,
- getMessageSetOfSize(List(message1), 5)),
- new ProducerRequest(topic2, 0,
- getMessageSetOfSize(List(message2), 5)))))
+ def testRandomPartitioner() {
+ val props = new Properties()
+ props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+ val config = new ProducerConfig(props)
+ val handler = new DefaultEventHandler[String,String](config,
+ partitioner = null.asInstanceOf[Partitioner[String]],
+ encoder = null.asInstanceOf[Encoder[String]],
+ producerPool = null,
+ populateProducerPool = false,
+ brokerPartitionInfo = null)
+ val producerDataList = new ListBuffer[ProducerData[String,Message]]
+ producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
+ producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
+ producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
+
+ val partitionedData = handler.partitionAndCollate(producerDataList)
+ for ((brokerId, dataPerBroker) <- partitionedData) {
+ for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+ assertTrue(partitionId == ProducerRequest.RandomPartition)
+ }
+ }
+ @Test
+ def testBrokerListAndAsync() {
+ val topic = "topic1"
+ val msgs = TestUtils.getMsgStrings(10)
+ val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+ mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
+ messagesToSet(msgs.take(5))))))
EasyMock.expectLastCall
- basicProducer.close
+ mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
+ messagesToSet(msgs.takeRight(5))))))
EasyMock.expectLastCall
- EasyMock.replay(basicProducer)
+ mockSyncProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(mockSyncProducer)
val props = new Properties()
- props.put("host", "localhost")
- props.put("port", "9092")
- props.put("queue.size", "50")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("batch.size", "20")
-
- val config = new AsyncProducerConfig(props)
-
- val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
- producer.start
- val serializer = new StringSerializer
- for(i <- 0 until 5) {
- producer.send(topic2, messageContent2, 0)
- producer.send(topic2, messageContent2, 1)
- producer.send(topic1, messageContent1, 0)
- producer.send(topic1, messageContent1, 1)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("producer.type", "async")
+ props.put("batch.size", "5")
+ props.put("broker.list", "0:localhost:9092")
+
+ val config = new ProducerConfig(props)
+ val producerPool = new ProducerPool(config)
+ producerPool.addProducer(0, mockSyncProducer)
+
+ val handler = new DefaultEventHandler[String,String](config,
+ partitioner = null.asInstanceOf[Partitioner[String]],
+ encoder = new StringEncoder,
+ producerPool = producerPool,
+ populateProducerPool = false,
+ brokerPartitionInfo = null)
+
+ val producer = new Producer[String, String](config, handler)
+ try {
+ // send all 10 messages, should create 2 batches and 2 syncproducer calls
+ producer.send(msgs.map(m => new ProducerData[String,String](topic, List(m))): _*)
+ producer.close
+
+ } catch {
+ case e: Exception => fail("Not expected", e)
}
- producer.close
- EasyMock.verify(basicProducer)
+ EasyMock.verify(mockSyncProducer)
+ }
+ @Test
+ def testJavaProducer() {
+ val topic = "topic1"
+ val msgs = TestUtils.getMsgStrings(5)
+ val scalaProducerData = msgs.map(m => new ProducerData[String, String](topic, List(m)))
+ val javaProducerData = scala.collection.JavaConversions.asList(msgs.map(m => {
+ val javaList = new LinkedList[String]()
+ javaList.add(m)
+ new kafka.javaapi.producer.ProducerData[String, String](topic, javaList)
+ }))
+
+ val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
+ mockScalaProducer.send(scalaProducerData.head)
+ EasyMock.expectLastCall()
+ mockScalaProducer.send(scalaProducerData: _*)
+ EasyMock.expectLastCall()
+ EasyMock.replay(mockScalaProducer)
+
+ val javaProducer = new kafka.javaapi.producer.Producer[String, String](mockScalaProducer)
+ javaProducer.send(javaProducerData.get(0))
+ javaProducer.send(javaProducerData)
+
+ EasyMock.verify(mockScalaProducer)
}
- private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
- var messageList = new Array[Message](counts)
- for(message <- messages) {
- for(i <- 0 until counts) {
- messageList(i) = message
- }
+ @Test
+ def testInvalidConfiguration() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9092")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ props.put("producer.type", "async")
+
+ try {
+ new ProducerConfig(props)
+ fail("should complain about wrong config")
+ }
+ catch {
+ case e: InvalidConfigException => //expected
}
- new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
}
- class StringSerializer extends Encoder[String] {
- def toMessage(event: String):Message = new Message(event.getBytes)
- def getTopic(event: String): String = event.concat("-topic")
+ private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
+ val encoder = new StringEncoder
+ new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
}
class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Feb 8 02:59:54 2012
@@ -17,23 +17,18 @@
package kafka.producer
-import async.AsyncProducer
-import java.util.Properties
import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
import kafka.zk.EmbeddedZookeeper
import org.junit.{After, Before, Test}
-import junit.framework.Assert
-import org.easymock.EasyMock
-import java.util.concurrent.ConcurrentHashMap
-import kafka.cluster.Partition
+import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
-import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException}
import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.serializer.{StringEncoder, Encoder}
-import kafka.consumer.SimpleConsumer
import kafka.api.FetchRequest
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.message.Message
+import kafka.serializer.Encoder
+import kafka.consumer.SimpleConsumer
+import java.util.Properties
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
class ProducerTest extends JUnitSuite {
private val topic = "test-topic"
@@ -43,8 +38,6 @@ class ProducerTest extends JUnitSuite {
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
private var server2: KafkaServer = null
- private var producer1: SyncProducer = null
- private var producer2: SyncProducer = null
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
private var zkServer:EmbeddedZookeeper = null
@@ -71,16 +64,6 @@ class ProducerTest extends JUnitSuite {
props.put("host", "localhost")
props.put("port", port1.toString)
- producer1 = new SyncProducer(new SyncProducerConfig(props))
- producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test".getBytes())))
-
- producer2 = new SyncProducer(new SyncProducerConfig(props) {
- override val port = port2
- })
- producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test".getBytes())))
-
consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
@@ -104,293 +87,6 @@ class ProducerTest extends JUnitSuite {
}
@Test
- def testSend() {
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.StaticPartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
- val config = new ProducerConfig(props)
- val partitioner = new StaticPartitioner
- val serializer = new StringSerializer
-
- // 2 sync producers
- val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
- // it should send to partition 0 (first partition) on second broker i.e broker2
- syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
- EasyMock.expectLastCall
- syncProducer1.close
- EasyMock.expectLastCall
- syncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
- EasyMock.replay(syncProducer2)
-
- syncProducers.put(brokerId1, syncProducer1)
- syncProducers.put(brokerId2, syncProducer2)
-
- val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
- producer.close
-
- EasyMock.verify(syncProducer1)
- EasyMock.verify(syncProducer2)
- }
-
- @Test
- def testSendSingleMessage() {
- val props = new Properties()
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- props.put("broker.list", "0:localhost:9092")
-
-
- val config = new ProducerConfig(props)
- val partitioner = new StaticPartitioner
- val serializer = new StringSerializer
-
- // 2 sync producers
- val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
- // it should send to a random partition due to use of broker.list
- syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
- EasyMock.expectLastCall
- syncProducer1.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
-
- syncProducers.put(brokerId1, syncProducer1)
-
- val producerPool = new ProducerPool[String](config, serializer, syncProducers,
- new ConcurrentHashMap[Int, AsyncProducer[String]]())
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- producer.send(new ProducerData[String, String](topic, "t"))
- producer.close
-
- EasyMock.verify(syncProducer1)
- }
-
- @Test
- def testInvalidPartition() {
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.NegativePartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
- val config = new ProducerConfig(props)
-
- val richProducer = new Producer[String, String](config)
- try {
- richProducer.send(new ProducerData[String, String](topic, "test", Array("test")))
- Assert.fail("Should fail with InvalidPartitionException")
- }catch {
- case e: InvalidPartitionException => // expected, do nothing
- }
- }
-
- @Test
- def testDefaultEncoder() {
- val props = new Properties()
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
- val config = new ProducerConfig(props)
-
- val stringProducer1 = new Producer[String, String](config)
- try {
- stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test")))
- fail("Should fail with ClassCastException due to incompatible Encoder")
- } catch {
- case e: ClassCastException =>
- }
-
- props.put("serializer.class", "kafka.serializer.StringEncoder")
- val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
- stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
-
- val messageProducer1 = new Producer[String, Message](config)
- try {
- messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
- } catch {
- case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
- }
- }
-
- @Test
- def testSyncProducerPool() {
- // 2 sync producers
- val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
- syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
- EasyMock.expectLastCall
- syncProducer1.close
- EasyMock.expectLastCall
- syncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
- EasyMock.replay(syncProducer2)
-
- syncProducers.put(brokerId1, syncProducer1)
- syncProducers.put(brokerId2, syncProducer2)
-
- // default for producer.type is "sync"
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.NegativePartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
- syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
-
- producerPool.close
- EasyMock.verify(syncProducer1)
- EasyMock.verify(syncProducer2)
- }
-
- @Test
- def testAsyncProducerPool() {
- // 2 async producers
- val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
- val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
- asyncProducer1.send(topic, "test1", 0)
- EasyMock.expectLastCall
- asyncProducer1.close
- EasyMock.expectLastCall
- asyncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(asyncProducer1)
- EasyMock.replay(asyncProducer2)
-
- asyncProducers.put(brokerId1, asyncProducer1)
- asyncProducers.put(brokerId2, asyncProducer2)
-
- // change producer.type to "async"
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.NegativePartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("producer.type", "async")
- val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
- new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
-
- producerPool.close
- EasyMock.verify(asyncProducer1)
- EasyMock.verify(asyncProducer2)
- }
-
- @Test
- def testSyncUnavailableProducerException() {
- val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
- syncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
- EasyMock.replay(syncProducer2)
-
- syncProducers.put(brokerId2, syncProducer2)
-
- // default for producer.type is "sync"
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.NegativePartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
- syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- try {
- producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
- Assert.fail("Should fail with UnavailableProducerException")
- }catch {
- case e: UnavailableProducerException => // expected
- }
-
- producerPool.close
- EasyMock.verify(syncProducer1)
- EasyMock.verify(syncProducer2)
- }
-
- @Test
- def testAsyncUnavailableProducerException() {
- val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
- val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
- asyncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(asyncProducer1)
- EasyMock.replay(asyncProducer2)
-
- asyncProducers.put(brokerId2, asyncProducer2)
-
- // change producer.type to "async"
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.NegativePartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("producer.type", "async")
- val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
- new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
- try {
- producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
- Assert.fail("Should fail with UnavailableProducerException")
- }catch {
- case e: UnavailableProducerException => // expected
- }
-
- producerPool.close
- EasyMock.verify(asyncProducer1)
- EasyMock.verify(asyncProducer2)
- }
-
- @Test
- def testConfigBrokerPartitionInfoWithPartitioner {
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.StaticPartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("producer.type", "async")
- props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
- brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
-
- var config: ProducerConfig = null
- try {
- config = new ProducerConfig(props)
- fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
- }catch {
- case e: InvalidConfigException => // expected
- }
- }
-
- @Test
- def testConfigBrokerPartitionInfo() {
- val props = new Properties()
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("producer.type", "async")
- props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
-
- val config = new ProducerConfig(props)
- val partitioner = new StaticPartitioner
- val serializer = new StringSerializer
-
- // 2 async producers
- val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
- val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to a random partition due to use of broker.list
- asyncProducer1.send(topic, "test1", -1)
- EasyMock.expectLastCall
- asyncProducer1.close
- EasyMock.expectLastCall
- EasyMock.replay(asyncProducer1)
-
- asyncProducers.put(brokerId1, asyncProducer1)
-
- val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
- producer.close
-
- EasyMock.verify(asyncProducer1)
- }
-
- @Test
def testZKSendToNewTopic() {
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
@@ -398,7 +94,6 @@ class ProducerTest extends JUnitSuite {
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
- val serializer = new StringEncoder
val producer = new Producer[String, String](config)
try {
@@ -412,11 +107,11 @@ class ProducerTest extends JUnitSuite {
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+ assertTrue("Message set should have 1 message", messageSet2.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
}
@@ -431,7 +126,6 @@ class ProducerTest extends JUnitSuite {
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
- val serializer = new StringEncoder
val producer = new Producer[String, String](config)
try {
@@ -449,10 +143,10 @@ class ProducerTest extends JUnitSuite {
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
- Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ assertTrue("Message set should have another message", messageSet1.hasNext)
+ assertEquals(new Message("test1".getBytes), messageSet1.next.message)
} catch {
case e: Exception => fail("Not expected")
}
@@ -467,7 +161,6 @@ class ProducerTest extends JUnitSuite {
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
- val serializer = new StringEncoder
val producer = new Producer[String, String](config)
var server: KafkaServer = null
@@ -482,8 +175,8 @@ class ProducerTest extends JUnitSuite {
Thread.sleep(100)
// cross check if brokers got the messages
val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
+ assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ assertEquals(new Message("test".getBytes), messageSet1.next.message)
// shutdown server2
server2.shutdown
@@ -505,8 +198,8 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
- Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
- Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
+ assertTrue("Message set should have 1 message", messageSet2.hasNext)
+ assertEquals(new Message("test".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
@@ -516,152 +209,6 @@ class ProducerTest extends JUnitSuite {
}
}
- @Test
- def testPartitionedSendToNewTopic() {
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.StaticPartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
- val config = new ProducerConfig(props)
- val partitioner = new StaticPartitioner
- val serializer = new StringSerializer
-
- // 2 sync producers
- val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
- syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test1".getBytes)))
- EasyMock.expectLastCall
- syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test1".getBytes)))
- EasyMock.expectLastCall
- syncProducer1.close
- EasyMock.expectLastCall
- syncProducer2.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
- EasyMock.replay(syncProducer2)
-
- syncProducers.put(brokerId1, syncProducer1)
- syncProducers.put(brokerId2, syncProducer2)
-
- val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
- Thread.sleep(100)
-
- // now send again to this topic using a real producer, this time all brokers would have registered
- // their partitions in zookeeper
- producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test".getBytes())))
- Thread.sleep(100)
-
- // wait for zookeeper to register the new topic
- producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1")))
- Thread.sleep(100)
- producer.close
-
- EasyMock.verify(syncProducer1)
- EasyMock.verify(syncProducer2)
- }
-
- @Test
- def testPartitionedSendToNewBrokerInExistingTopic() {
- val props = new Properties()
- props.put("partitioner.class", "kafka.producer.StaticPartitioner")
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
- val config = new ProducerConfig(props)
- val partitioner = new StaticPartitioner
- val serializer = new StringSerializer
-
- // 2 sync producers
- val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
- val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
- val syncProducer3 = EasyMock.createMock(classOf[SyncProducer])
- syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test1".getBytes)))
- EasyMock.expectLastCall
- syncProducer1.close
- EasyMock.expectLastCall
- syncProducer2.close
- EasyMock.expectLastCall
- syncProducer3.close
- EasyMock.expectLastCall
- EasyMock.replay(syncProducer1)
- EasyMock.replay(syncProducer2)
- EasyMock.replay(syncProducer3)
-
- syncProducers.put(brokerId1, syncProducer1)
- syncProducers.put(brokerId2, syncProducer2)
- syncProducers.put(2, syncProducer3)
-
- val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- val port = TestUtils.choosePort
- val serverProps = TestUtils.createBrokerConfig(2, port)
- val serverConfig = new KafkaConfig(serverProps) {
- override val numPartitions = 4
- }
-
- val server3 = TestUtils.createServer(serverConfig)
- Thread.sleep(500)
- // send a message to the new broker to register it under topic "test-topic"
- val tempProps = new Properties()
- tempProps.put("host", "localhost")
- tempProps.put("port", port.toString)
- val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
- tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
- messages = new Message("test".getBytes())))
-
- Thread.sleep(500)
- producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
- producer.close
-
- EasyMock.verify(syncProducer1)
- EasyMock.verify(syncProducer2)
- EasyMock.verify(syncProducer3)
-
- server3.shutdown
- Utils.rm(server3.config.logDir)
- }
-
- @Test
- def testDefaultPartitioner() {
- val props = new Properties()
- props.put("serializer.class", "kafka.producer.StringSerializer")
- props.put("producer.type", "async")
- props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
- val config = new ProducerConfig(props)
- val partitioner = new DefaultPartitioner[String]
- val serializer = new StringSerializer
-
- // 2 async producers
- val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
- val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to a random partition due to use of broker.list
- asyncProducer1.send(topic, "test1", -1)
- EasyMock.expectLastCall
- asyncProducer1.close
- EasyMock.expectLastCall
- EasyMock.replay(asyncProducer1)
-
- asyncProducers.put(brokerId1, asyncProducer1)
-
- val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
- val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
- producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
- producer.close
-
- EasyMock.verify(asyncProducer1)
- }
}
class StringSerializer extends Encoder[String] {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed Feb 8 02:59:54 2012
@@ -28,8 +28,10 @@ import kafka.server._
import kafka.producer._
import kafka.message._
import org.I0Itec.zkclient.ZkClient
-import kafka.consumer.ConsumerConfig
import kafka.cluster.Broker
+import collection.mutable.ListBuffer
+import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+import scala.collection.Map
/**
* Utility functions to help with testing
@@ -308,6 +310,28 @@ object TestUtils {
brokers
}
+ def getConsumedMessages[T](nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[T]]]): List[T]= {
+ var messages: List[T] = Nil
+ for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStream <- messageStreams) {
+ val iterator = messageStream.iterator
+ for (i <- 0 until nMessagesPerThread) {
+ assertTrue(iterator.hasNext)
+ val message = iterator.next
+ messages ::= message
+ }
+ }
+ }
+ messages
+ }
+
+ def getMsgStrings(n: Int): Seq[String] = {
+ val buffer = new ListBuffer[String]
+ for (i <- 0 until n)
+ buffer += ("msg" + i)
+ buffer
+ }
+
}
object TestZKUtils {