You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/08 03:59:55 UTC

svn commit: r1241754 [2/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/common/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/producer/ main/scala/kafka/javaapi/producer/async/ main/scala/kafka/producer/ main/scala/kafka/producer/...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Feb  8 02:59:54 2012
@@ -17,287 +17,385 @@
 
 package kafka.producer
 
-import junit.framework.Assert
-import java.util.Properties
 import org.easymock.EasyMock
 import kafka.api.ProducerRequest
-import org.apache.log4j.{Logger, Level}
 import org.junit.Test
 import org.scalatest.junit.JUnitSuite
 import kafka.producer.async._
-import kafka.serializer.Encoder
+import java.util.concurrent.LinkedBlockingQueue
+import junit.framework.Assert._
+import collection.SortedSet
+import kafka.cluster.{Broker, Partition}
+import collection.mutable.{HashMap, ListBuffer}
+import collection.Map
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
+import java.util.{LinkedList, Properties}
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
 
 class AsyncProducerTest extends JUnitSuite {
 
-  private val messageContent1 = "test"
-  private val topic1 = "test-topic"
-  private val message1: Message = new Message(messageContent1.getBytes)
-
-  private val messageContent2 = "test1"
-  private val topic2 = "test1$topic"
-  private val message2: Message = new Message(messageContent2.getBytes)
-  val asyncProducerLogger = Logger.getLogger(classOf[AsyncProducer[String]])
-
   @Test
   def testProducerQueueSize() {
-    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-      getMessageSetOfSize(List(message1), 10)))))
-    EasyMock.expectLastCall
-    basicProducer.close
-    EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
+    // a mock event handler that blocks
+    val mockEventHandler = new EventHandler[String,String] {
 
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "10")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    val config = new AsyncProducerConfig(props)
+      def handle(events: Seq[ProducerData[String,String]]) {
+        Thread.sleep(1000000)
+      }
 
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+      def close {}
+    }
 
-    //temporarily set log4j to a higher level to avoid error in the output
-    producer.setLoggerLevel(Level.FATAL)
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9092")
+    props.put("producer.type", "async")
+    props.put("queue.size", "10")
+    props.put("batch.size", "1")
 
+    val config = new ProducerConfig(props)
+    val produceData = getProduceData(12)
+    val producer = new Producer[String, String](config, mockEventHandler)
     try {
-      for(i <- 0 until 11) {
-        producer.send(messageContent1 + "-topic", messageContent1)
-      }
-      Assert.fail("Queue should be full")
+      // send all 10 messages, should hit the batch size and then reach broker
+      producer.send(produceData: _*)
+      fail("Queue should be full")
     }
     catch {
-      case e: QueueFullException => println("Queue is full..")
+      case e: QueueFullException => //expected
     }
-    producer.start
-    producer.close
-    Thread.sleep(2000)
-    EasyMock.verify(basicProducer)
-    producer.setLoggerLevel(Level.ERROR)
   }
 
   @Test
-  def testAddAfterQueueClosed() {
-    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-      getMessageSetOfSize(List(message1), 10)))))
-    EasyMock.expectLastCall
-    basicProducer.close
-    EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
-
+  def testProduceAfterClosed() {
     val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "10")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    val config = new AsyncProducerConfig(props)
-
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
-    producer.start
-    for(i <- 0 until 10) {
-      producer.send(messageContent1 + "-topic", messageContent1)
-    }
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9092")
+    props.put("producer.type", "async")
+    props.put("batch.size", "1")
+
+    val config = new ProducerConfig(props)
+    val produceData = getProduceData(10)
+    val producer = new Producer[String, String](config)
     producer.close
 
     try {
-      producer.send(messageContent1 + "-topic", messageContent1)
-      Assert.fail("Queue should be closed")
-    } catch {
-      case e: QueueClosedException =>
+      producer.send(produceData: _*)
+      fail("should complain that producer is already closed")
+    }
+    catch {
+      case e: ProducerClosedException => //expected
     }
-    EasyMock.verify(basicProducer)
+  }
+
+  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
+    val producerDataList = new ListBuffer[ProducerData[String,String]]
+    for (i <- 0 until nEvents)
+      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
+    producerDataList
   }
 
   @Test
   def testBatchSize() {
-    val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-      getMessageSetOfSize(List(message1), 5)))))
-    EasyMock.expectLastCall.times(2)
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-      getMessageSetOfSize(List(message1), 1)))))
-    EasyMock.expectLastCall
-    basicProducer.close
-    EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
-
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "10")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("batch.size", "5")
+    /**
+     *  Send a total of 10 messages with batch size of 5. Expect 2 calls to the handler, one for each batch.
+     */
+    val producerDataList = getProduceData(10)
+    val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
+    mockHandler.handle(producerDataList.take(5))
+    EasyMock.expectLastCall
+    mockHandler.handle(producerDataList.takeRight(5))
+    EasyMock.expectLastCall
+    EasyMock.replay(mockHandler)
+
+    val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+    val producerSendThread =
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5)
+    producerSendThread.start()
 
-    val config = new AsyncProducerConfig(props)
+    for (producerData <- producerDataList)
+      queue.put(producerData)
 
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+    producerSendThread.shutdown
+    EasyMock.verify(mockHandler)
+  }
 
-    producer.start
-    for(i <- 0 until 10) {
-      producer.send(messageContent1 + "-topic", messageContent1)
-    }
+  @Test
+  def testQueueTimeExpired() {
+    /**
+     *  Send a total of 2 messages with batch size of 5 and queue time of 200ms.
+     *  Expect 1 calls to the handler after 200ms.
+     */
+    val producerDataList = getProduceData(2)
+    val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]])
+    mockHandler.handle(producerDataList)
+    EasyMock.expectLastCall
+    EasyMock.replay(mockHandler)
+
+    val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+    val producerSendThread =
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, 200, 5)
+    producerSendThread.start()
 
-    Thread.sleep(100)
-    try {
-      producer.send(messageContent1 + "-topic", messageContent1)
-    } catch {
-      case e: QueueFullException =>
-        Assert.fail("Queue should not be full")
-    }
+    for (producerData <- producerDataList)
+      queue.put(producerData)
 
-    producer.close
-    EasyMock.verify(basicProducer)
+    Thread.sleep(300)
+    producerSendThread.shutdown
+    EasyMock.verify(mockHandler)
   }
 
   @Test
-  def testQueueTimeExpired() {
-    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-      getMessageSetOfSize(List(message1), 3)))))
-    EasyMock.expectLastCall
-    basicProducer.close
-    EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
+  def testPartitionAndCollateEvents() {
+    val producerDataList = new ListBuffer[ProducerData[Int,Message]]
+    producerDataList.append(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
+    producerDataList.append(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
+    producerDataList.append(new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
+    producerDataList.append(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
+    producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
 
     val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "10")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("queue.time", "200")
+    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
 
-    val config = new AsyncProducerConfig(props)
-
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-    val serializer = new StringSerializer
-
-    producer.start
-    for(i <- 0 until 3) {
-      producer.send(serializer.getTopic(messageContent1), messageContent1, ProducerRequest.RandomPartition)
+    val intPartitioner = new Partitioner[Int] {
+      def partition(key: Int, numPartitions: Int): Int = key % numPartitions
     }
+    val config = new ProducerConfig(props)
+    val handler = new DefaultEventHandler[Int,String](config,
+                                                      partitioner = intPartitioner,
+                                                      encoder = null.asInstanceOf[Encoder[String]],
+                                                      producerPool = null,
+                                                      populateProducerPool = false,
+                                                      brokerPartitionInfo = null)
+
+    val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
+    topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
+                                     new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))))
+    val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]]
+    topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))))
+    val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]]
+    topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
+    val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
+    topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
+    val expectedResult = Map(
+        0 -> Map(
+              ("topic1", -1) -> topic1Broker1Data,
+              ("topic2", -1) -> topic2Broker1Data),
+        1 -> Map(
+              ("topic1", -1) -> topic1Broker2Data,
+              ("topic2", -1) -> topic2Broker2Data)
+      )
 
-    Thread.sleep(300)
-    producer.close
-    EasyMock.verify(basicProducer)
+    val actualResult = handler.partitionAndCollate(producerDataList)
+    assertEquals(expectedResult, actualResult)
   }
 
   @Test
-  def testSenderThreadShutdown() {
-    val syncProducerProps = new Properties()
-    syncProducerProps.put("host", "localhost")
-    syncProducerProps.put("port", "9092")
-    syncProducerProps.put("buffer.size", "1000")
-    syncProducerProps.put("connect.timeout.ms", "1000")
-    syncProducerProps.put("reconnect.interval", "1000")
-    val basicProducer = new MockProducer(new SyncProducerConfig(syncProducerProps))
-
-    val asyncProducerProps = new Properties()
-    asyncProducerProps.put("host", "localhost")
-    asyncProducerProps.put("port", "9092")
-    asyncProducerProps.put("queue.size", "10")
-    asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
-    asyncProducerProps.put("queue.time", "100")
-
-    val config = new AsyncProducerConfig(asyncProducerProps)
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-    producer.start
-    producer.send(messageContent1 + "-topic", messageContent1)
-    producer.close
+  def testSerializeEvents() {
+    val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
+    val props = new Properties()
+    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    val config = new ProducerConfig(props)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         encoder = new StringEncoder,
+                                                         producerPool = null,
+                                                         populateProducerPool = false,
+                                                         brokerPartitionInfo = null)
+
+    val serializedData = handler.serialize(produceData)
+    val decoder = new StringDecoder
+    val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m))))
+    TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
   }
 
   @Test
-  def testCollateEvents() {
-    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition,
-                                                                     getMessageSetOfSize(List(message2), 5)),
-                                                 new ProducerRequest(topic1, ProducerRequest.RandomPartition,
-                                                                     getMessageSetOfSize(List(message1), 5)))))
-    EasyMock.expectLastCall
-    basicProducer.close
-    EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
+  def testInvalidPartition() {
+    val producerDataList = new ListBuffer[ProducerData[String,Message]]
+    producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
+    val props = new Properties()
+    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    val config = new ProducerConfig(props)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = new NegativePartitioner,
+                                                         encoder = null.asInstanceOf[Encoder[String]],
+                                                         producerPool = null,
+                                                         populateProducerPool = false,
+                                                         brokerPartitionInfo = null)
+    try {
+      handler.partitionAndCollate(producerDataList)
+      fail("Should fail with InvalidPartitionException")
+    }
+    catch {
+      case e: InvalidPartitionException => // expected, do nothing
+    }
+  }
+
+  private def getMockBrokerPartitionInfo(): BrokerPartitionInfo ={
+    new BrokerPartitionInfo {
+      def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition] = SortedSet.empty[Partition]
+
+      def getBrokerInfo(brokerId: Int): Option[Broker] = None
+
+      def getAllBrokerInfo: Map[Int, Broker] = new HashMap[Int, Broker]
+
+      def updateInfo = {}
 
+      def close = {}
+    }
+  }
+
+  @Test
+  def testNoBroker() {
+    val producerDataList = new ListBuffer[ProducerData[String,String]]
+    producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
     val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "50")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("batch.size", "10")
-
-    val config = new AsyncProducerConfig(props)
-
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
-    producer.start
-    val serializer = new StringSerializer
-    for(i <- 0 until 5) {
-      producer.send(messageContent1 + "-topic", messageContent1)
-      producer.send(messageContent2 + "$topic", messageContent2, ProducerRequest.RandomPartition)
+    val config = new ProducerConfig(props)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         encoder = new StringEncoder,
+                                                         producerPool = null,
+                                                         populateProducerPool = false,
+                                                         brokerPartitionInfo = getMockBrokerPartitionInfo)
+    try {
+      handler.handle(producerDataList)
+      fail("Should fail with NoBrokersForPartitionException")
+    }
+    catch {
+      case e: NoBrokersForPartitionException => // expected, do nothing
     }
+  }
 
-    producer.close
-    EasyMock.verify(basicProducer)
+  @Test
+  def testIncompatibleEncoder() {
+    val props = new Properties()
+    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    val config = new ProducerConfig(props)
 
+    val producer=new Producer[String, String](config)
+    try {
+      producer.send(getProduceData(1): _*)
+      fail("Should fail with ClassCastException due to incompatible Encoder")
+    } catch {
+      case e: ClassCastException =>
+    }
   }
 
   @Test
-  def testCollateAndSerializeEvents() {
-    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
-    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
-                                                                     getMessageSetOfSize(List(message2), 5)),
-                                                 new ProducerRequest(topic1, 0,
-                                                                     getMessageSetOfSize(List(message1), 5)),
-                                                 new ProducerRequest(topic1, 1,
-                                                                     getMessageSetOfSize(List(message1), 5)),
-                                                 new ProducerRequest(topic2, 0,
-                                                                     getMessageSetOfSize(List(message2), 5)))))
+  def testRandomPartitioner() {
+    val props = new Properties()
+    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    val config = new ProducerConfig(props)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         encoder = null.asInstanceOf[Encoder[String]],
+                                                         producerPool = null,
+                                                         populateProducerPool = false,
+                                                         brokerPartitionInfo = null)
+    val producerDataList = new ListBuffer[ProducerData[String,Message]]
+    producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
+    producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
+    producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
+
+    val partitionedData = handler.partitionAndCollate(producerDataList)
+    for ((brokerId, dataPerBroker) <- partitionedData) {
+      for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+        assertTrue(partitionId == ProducerRequest.RandomPartition)
+    }
+  }
 
+  @Test
+  def testBrokerListAndAsync() {
+    val topic = "topic1"
+    val msgs = TestUtils.getMsgStrings(10)
+    val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
+      messagesToSet(msgs.take(5))))))
     EasyMock.expectLastCall
-    basicProducer.close
+    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
+      messagesToSet(msgs.takeRight(5))))))
     EasyMock.expectLastCall
-    EasyMock.replay(basicProducer)
+    mockSyncProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(mockSyncProducer)
 
     val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", "9092")
-    props.put("queue.size", "50")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("batch.size", "20")
-
-    val config = new AsyncProducerConfig(props)
-
-    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
-
-    producer.start
-    val serializer = new StringSerializer
-    for(i <- 0 until 5) {
-      producer.send(topic2, messageContent2, 0)
-      producer.send(topic2, messageContent2, 1)
-      producer.send(topic1, messageContent1, 0)
-      producer.send(topic1, messageContent1, 1)
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("producer.type", "async")
+    props.put("batch.size", "5")
+    props.put("broker.list", "0:localhost:9092")
+
+    val config = new ProducerConfig(props)
+    val producerPool = new ProducerPool(config)
+    producerPool.addProducer(0, mockSyncProducer)
+
+    val handler = new DefaultEventHandler[String,String](config,
+                                                      partitioner = null.asInstanceOf[Partitioner[String]],
+                                                      encoder = new StringEncoder,
+                                                      producerPool = producerPool,
+                                                      populateProducerPool = false,
+                                                      brokerPartitionInfo = null)
+
+    val producer = new Producer[String, String](config, handler)
+    try {
+      // send all 10 messages, should create 2 batches and 2 syncproducer calls
+      producer.send(msgs.map(m => new ProducerData[String,String](topic, List(m))): _*)
+      producer.close
+
+    } catch {
+      case e: Exception => fail("Not expected", e)
     }
 
-    producer.close
-    EasyMock.verify(basicProducer)
+    EasyMock.verify(mockSyncProducer)
+  }
 
+  @Test
+  def testJavaProducer() {
+    val topic = "topic1"
+    val msgs = TestUtils.getMsgStrings(5)
+    val scalaProducerData = msgs.map(m => new ProducerData[String, String](topic, List(m)))
+    val javaProducerData = scala.collection.JavaConversions.asList(msgs.map(m => {
+        val javaList = new LinkedList[String]()
+        javaList.add(m)
+        new kafka.javaapi.producer.ProducerData[String, String](topic, javaList)
+      }))
+
+    val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
+    mockScalaProducer.send(scalaProducerData.head)
+    EasyMock.expectLastCall()
+    mockScalaProducer.send(scalaProducerData: _*)
+    EasyMock.expectLastCall()
+    EasyMock.replay(mockScalaProducer)
+
+    val javaProducer = new kafka.javaapi.producer.Producer[String, String](mockScalaProducer)
+    javaProducer.send(javaProducerData.get(0))
+    javaProducer.send(javaProducerData)
+
+    EasyMock.verify(mockScalaProducer)
   }
 
-  private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
-    var messageList = new Array[Message](counts)
-    for(message <- messages) {
-      for(i <- 0 until counts) {
-        messageList(i) = message
-      }
+  @Test
+  def testInvalidConfiguration() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("producer.type", "async")
+
+    try {
+      new ProducerConfig(props)
+      fail("should complain about wrong config")
+    }
+    catch {
+      case e: InvalidConfigException => //expected
     }
-    new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
   }
 
-  class StringSerializer extends Encoder[String] {
-    def toMessage(event: String):Message = new Message(event.getBytes)
-    def getTopic(event: String): String = event.concat("-topic")
+  private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
+    val encoder = new StringEncoder
+    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
   }
 
   class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Feb  8 02:59:54 2012
@@ -17,23 +17,18 @@
 
 package kafka.producer
 
-import async.AsyncProducer
-import java.util.Properties
 import org.apache.log4j.{Logger, Level}
-import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import org.junit.{After, Before, Test}
-import junit.framework.Assert
-import org.easymock.EasyMock
-import java.util.concurrent.ConcurrentHashMap
-import kafka.cluster.Partition
+import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
-import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException}
 import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.serializer.{StringEncoder, Encoder}
-import kafka.consumer.SimpleConsumer
 import kafka.api.FetchRequest
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.message.Message
+import kafka.serializer.Encoder
+import kafka.consumer.SimpleConsumer
+import java.util.Properties
+import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 
 class ProducerTest extends JUnitSuite {
   private val topic = "test-topic"
@@ -43,8 +38,6 @@ class ProducerTest extends JUnitSuite {
   private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
   private var server2: KafkaServer = null
-  private var producer1: SyncProducer = null
-  private var producer2: SyncProducer = null
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private var zkServer:EmbeddedZookeeper = null
@@ -71,16 +64,6 @@ class ProducerTest extends JUnitSuite {
     props.put("host", "localhost")
     props.put("port", port1.toString)
 
-    producer1 = new SyncProducer(new SyncProducerConfig(props))
-    producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                          messages = new Message("test".getBytes())))
-
-    producer2 = new SyncProducer(new SyncProducerConfig(props) {
-      override val port = port2
-    })
-    producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                          messages = new Message("test".getBytes())))
-
     consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
     consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
 
@@ -104,293 +87,6 @@ class ProducerTest extends JUnitSuite {
   }
 
   @Test
-  def testSend() {
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-    val config = new ProducerConfig(props)
-    val partitioner = new StaticPartitioner
-    val serializer = new StringSerializer
-
-    // 2 sync producers
-    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
-    // it should send to partition 0 (first partition) on second broker i.e broker2
-    syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
-    EasyMock.expectLastCall
-    syncProducer1.close
-    EasyMock.expectLastCall
-    syncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-    EasyMock.replay(syncProducer2)
-
-    syncProducers.put(brokerId1, syncProducer1)
-    syncProducers.put(brokerId2, syncProducer2)
-
-    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
-    producer.close
-
-    EasyMock.verify(syncProducer1)
-    EasyMock.verify(syncProducer2)
-  }
-
-  @Test
-  def testSendSingleMessage() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9092")
-
-
-    val config = new ProducerConfig(props)
-    val partitioner = new StaticPartitioner
-    val serializer = new StringSerializer
-
-    // 2 sync producers
-    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
-    // it should send to a random partition due to use of broker.list
-    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
-    EasyMock.expectLastCall
-    syncProducer1.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-
-    syncProducers.put(brokerId1, syncProducer1)
-
-    val producerPool = new ProducerPool[String](config, serializer, syncProducers,
-      new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    producer.send(new ProducerData[String, String](topic, "t"))
-    producer.close
-
-    EasyMock.verify(syncProducer1)
-  }
-
-  @Test
-  def testInvalidPartition() {
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-    val config = new ProducerConfig(props)
-
-    val richProducer = new Producer[String, String](config)
-    try {
-      richProducer.send(new ProducerData[String, String](topic, "test", Array("test")))
-      Assert.fail("Should fail with InvalidPartitionException")
-    }catch {
-      case e: InvalidPartitionException => // expected, do nothing
-    }
-  }
-
-  @Test
-  def testDefaultEncoder() {
-    val props = new Properties()
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-    val config = new ProducerConfig(props)
-
-    val stringProducer1 = new Producer[String, String](config)
-    try {
-      stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test")))
-      fail("Should fail with ClassCastException due to incompatible Encoder")
-    } catch {
-      case e: ClassCastException =>
-    }
-
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
-    stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
-
-    val messageProducer1 = new Producer[String, Message](config)
-    try {
-      messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
-    } catch {
-      case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
-    }
-  }
-
-  @Test
-  def testSyncProducerPool() {
-    // 2 sync producers
-    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
-    syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
-    EasyMock.expectLastCall
-    syncProducer1.close
-    EasyMock.expectLastCall
-    syncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-    EasyMock.replay(syncProducer2)
-
-    syncProducers.put(brokerId1, syncProducer1)
-    syncProducers.put(brokerId2, syncProducer2)
-
-    // default for producer.type is "sync"
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
-      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
-
-    producerPool.close
-    EasyMock.verify(syncProducer1)
-    EasyMock.verify(syncProducer2)
-  }
-
-  @Test
-  def testAsyncProducerPool() {
-    // 2 async producers
-    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
-    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    asyncProducer1.send(topic, "test1", 0)
-    EasyMock.expectLastCall
-    asyncProducer1.close
-    EasyMock.expectLastCall
-    asyncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(asyncProducer1)
-    EasyMock.replay(asyncProducer2)
-
-    asyncProducers.put(brokerId1, asyncProducer1)
-    asyncProducers.put(brokerId2, asyncProducer2)
-
-    // change producer.type to "async"
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("producer.type", "async")
-    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
-      new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
-    producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
-
-    producerPool.close
-    EasyMock.verify(asyncProducer1)
-    EasyMock.verify(asyncProducer2)
-  }
-
-  @Test
-  def testSyncUnavailableProducerException() {
-    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
-    syncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-    EasyMock.replay(syncProducer2)
-
-    syncProducers.put(brokerId2, syncProducer2)
-
-    // default for producer.type is "sync"
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
-      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    try {
-      producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
-      Assert.fail("Should fail with UnavailableProducerException")
-    }catch {
-      case e: UnavailableProducerException => // expected
-    }
-
-    producerPool.close
-    EasyMock.verify(syncProducer1)
-    EasyMock.verify(syncProducer2)
-  }
-
-  @Test
-  def testAsyncUnavailableProducerException() {
-    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
-    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    asyncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(asyncProducer1)
-    EasyMock.replay(asyncProducer2)
-
-    asyncProducers.put(brokerId2, asyncProducer2)
-
-    // change producer.type to "async"
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("producer.type", "async")
-    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
-      new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
-    try {
-      producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
-      Assert.fail("Should fail with UnavailableProducerException")
-    }catch {
-      case e: UnavailableProducerException => // expected
-    }
-
-    producerPool.close
-    EasyMock.verify(asyncProducer1)
-    EasyMock.verify(asyncProducer2)
-  }
-
-  @Test
-  def testConfigBrokerPartitionInfoWithPartitioner {
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("producer.type", "async")
-    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
-                                       brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
-
-    var config: ProducerConfig = null
-    try {
-      config = new ProducerConfig(props)
-      fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
-    }catch {
-      case e: InvalidConfigException => // expected
-    }
-  }
-
-  @Test
-  def testConfigBrokerPartitionInfo() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("producer.type", "async")
-    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
-
-    val config = new ProducerConfig(props)
-    val partitioner = new StaticPartitioner
-    val serializer = new StringSerializer
-
-    // 2 async producers
-    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
-    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to a random partition due to use of broker.list
-    asyncProducer1.send(topic, "test1", -1)
-    EasyMock.expectLastCall
-    asyncProducer1.close
-    EasyMock.expectLastCall
-    EasyMock.replay(asyncProducer1)
-
-    asyncProducers.put(brokerId1, asyncProducer1)
-
-    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
-    producer.close
-
-    EasyMock.verify(asyncProducer1)
-  }
-
-  @Test
   def testZKSendToNewTopic() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
@@ -398,7 +94,6 @@ class ProducerTest extends JUnitSuite {
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-    val serializer = new StringEncoder
 
     val producer = new Producer[String, String](config)
     try {
@@ -412,11 +107,11 @@ class ProducerTest extends JUnitSuite {
       Thread.sleep(100)
       // cross check if brokers got the messages
       val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
       val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+      assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet2.next.message)
     } catch {
       case e: Exception => fail("Not expected", e)
     }
@@ -431,7 +126,6 @@ class ProducerTest extends JUnitSuite {
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-    val serializer = new StringEncoder
 
     val producer = new Producer[String, String](config)
     try {
@@ -449,10 +143,10 @@ class ProducerTest extends JUnitSuite {
       Thread.sleep(100)
       // cross check if brokers got the messages
       val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-      Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
-      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertTrue("Message set should have another message", messageSet1.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
     } catch {
       case e: Exception => fail("Not expected")
     }
@@ -467,7 +161,6 @@ class ProducerTest extends JUnitSuite {
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-    val serializer = new StringEncoder
 
     val producer = new Producer[String, String](config)
     var server: KafkaServer = null
@@ -482,8 +175,8 @@ class ProducerTest extends JUnitSuite {
       Thread.sleep(100)
       // cross check if brokers got the messages
       val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
+      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet1.next.message)
 
       // shutdown server2
       server2.shutdown
@@ -505,8 +198,8 @@ class ProducerTest extends JUnitSuite {
 
       // cross check if brokers got the messages
       val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
-      Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
+      assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      assertEquals(new Message("test".getBytes), messageSet2.next.message)
 
     } catch {
       case e: Exception => fail("Not expected", e)
@@ -516,152 +209,6 @@ class ProducerTest extends JUnitSuite {
     }
   }
 
-  @Test
-  def testPartitionedSendToNewTopic() {
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-    val partitioner = new StaticPartitioner
-    val serializer = new StringSerializer
-
-    // 2 sync producers
-    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
-    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                                  messages = new Message("test1".getBytes)))
-    EasyMock.expectLastCall
-    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                                  messages = new Message("test1".getBytes)))
-    EasyMock.expectLastCall
-    syncProducer1.close
-    EasyMock.expectLastCall
-    syncProducer2.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-    EasyMock.replay(syncProducer2)
-
-    syncProducers.put(brokerId1, syncProducer1)
-    syncProducers.put(brokerId2, syncProducer2)
-
-    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
-    Thread.sleep(100)
-
-    // now send again to this topic using a real producer, this time all brokers would have registered
-    // their partitions in zookeeper
-    producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                           messages = new Message("test".getBytes())))
-    Thread.sleep(100)
-
-    // wait for zookeeper to register the new topic
-    producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1")))
-    Thread.sleep(100)
-    producer.close
-
-    EasyMock.verify(syncProducer1)
-    EasyMock.verify(syncProducer2)
-  }
-
-  @Test
-  def testPartitionedSendToNewBrokerInExistingTopic() {
-    val props = new Properties()
-    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-    val partitioner = new StaticPartitioner
-    val serializer = new StringSerializer
-
-    // 2 sync producers
-    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
-    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
-    val syncProducer3 = EasyMock.createMock(classOf[SyncProducer])
-    syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                                 messages = new Message("test1".getBytes)))
-    EasyMock.expectLastCall
-    syncProducer1.close
-    EasyMock.expectLastCall
-    syncProducer2.close
-    EasyMock.expectLastCall
-    syncProducer3.close
-    EasyMock.expectLastCall
-    EasyMock.replay(syncProducer1)
-    EasyMock.replay(syncProducer2)
-    EasyMock.replay(syncProducer3)
-
-    syncProducers.put(brokerId1, syncProducer1)
-    syncProducers.put(brokerId2, syncProducer2)
-    syncProducers.put(2, syncProducer3)
-
-    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    val port = TestUtils.choosePort
-    val serverProps = TestUtils.createBrokerConfig(2, port)
-    val serverConfig = new KafkaConfig(serverProps) {
-      override val numPartitions = 4
-    }
-
-    val server3 = TestUtils.createServer(serverConfig)
-    Thread.sleep(500)
-    // send a message to the new broker to register it under topic "test-topic"
-    val tempProps = new Properties()
-    tempProps.put("host", "localhost")
-    tempProps.put("port", port.toString)
-    val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
-    tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                             messages = new Message("test".getBytes())))
-
-    Thread.sleep(500)
-    producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
-    producer.close
-
-    EasyMock.verify(syncProducer1)
-    EasyMock.verify(syncProducer2)
-    EasyMock.verify(syncProducer3)
-
-    server3.shutdown
-    Utils.rm(server3.config.logDir)
-  }
-
-  @Test
-  def testDefaultPartitioner() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.producer.StringSerializer")
-    props.put("producer.type", "async")
-    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
-    val config = new ProducerConfig(props)
-    val partitioner = new DefaultPartitioner[String]
-    val serializer = new StringSerializer
-
-    // 2 async producers
-    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
-    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to a random partition due to use of broker.list
-    asyncProducer1.send(topic, "test1", -1)
-    EasyMock.expectLastCall
-    asyncProducer1.close
-    EasyMock.expectLastCall
-    EasyMock.replay(asyncProducer1)
-
-    asyncProducers.put(brokerId1, asyncProducer1)
-
-    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
-    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
-
-    producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
-    producer.close
-
-    EasyMock.verify(asyncProducer1)
-  }
 }
 
 class StringSerializer extends Encoder[String] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1241754&r1=1241753&r2=1241754&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed Feb  8 02:59:54 2012
@@ -28,8 +28,10 @@ import kafka.server._
 import kafka.producer._
 import kafka.message._
 import org.I0Itec.zkclient.ZkClient
-import kafka.consumer.ConsumerConfig
 import kafka.cluster.Broker
+import collection.mutable.ListBuffer
+import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+import scala.collection.Map
 
 /**
  * Utility functions to help with testing
@@ -308,6 +310,28 @@ object TestUtils {
     brokers
   }
 
+  def getConsumedMessages[T](nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[T]]]): List[T]= {
+    var messages: List[T] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next
+          messages ::= message
+        }
+      }
+    }
+    messages
+  }
+
+  def getMsgStrings(n: Int): Seq[String] = {
+    val buffer = new ListBuffer[String]
+    for (i <- 0 until  n)
+      buffer += ("msg" + i)
+    buffer
+  }
+
 }
 
 object TestZKUtils {