You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [23/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.util.Arrays
+import junit.framework.Assert._
+import kafka.utils.TestUtils._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+trait BaseMessageSetTestCases extends JUnitSuite {
+  
+  val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+  
+  def createMessageSet(messages: Seq[Message]): MessageSet
+
+  @Test
+  def testWrittenEqualsRead {
+    val messageSet = createMessageSet(messages)
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+  }
+
+  @Test
+  def testIteratorIsConsistent() {
+    val m = createMessageSet(messages)
+    // two iterators over the same set should give the same results
+    checkEquals(m.iterator, m.iterator)
+  }
+
+  @Test
+  def testSizeInBytes() {
+    assertEquals("Empty message set should have 0 bytes.",
+                 0L,
+                 createMessageSet(Array[Message]()).sizeInBytes)
+    assertEquals("Predicted size should equal actual size.", 
+                 MessageSet.messageSetSize(messages).toLong, 
+                 createMessageSet(messages).sizeInBytes)
+  }
+
+  @Test
+  def testWriteTo() {
+    // test empty message set
+    testWriteToWithMessageSet(createMessageSet(Array[Message]()))
+    testWriteToWithMessageSet(createMessageSet(messages))
+  }
+
+  def testWriteToWithMessageSet(set: MessageSet) {
+    val channel = tempChannel()
+    val written = set.writeTo(channel, 0, 1024)
+    assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
+    val newSet = new FileMessageSet(channel, false)
+    checkEquals(set.iterator, newSet.iterator)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import junit.framework.Assert._
+import org.junit.Test
+
+class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
+
+  override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet = 
+    new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+  
+  @Test
+  def testValidBytes() {
+    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+    buffer.put(messages.serialized)
+    buffer.putShort(4)
+    val messagesPlus = new ByteBufferMessageSet(buffer)
+    assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+  }
+
+  @Test
+  def testEquals() {
+    var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+
+    assertTrue(messages.equals(moreMessages))
+
+    messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+
+    assertTrue(messages.equals(moreMessages))
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,41 @@
+package kafka.message
+
+import junit.framework.TestCase
+import kafka.utils.TestUtils
+
+class CompressionUtilTest extends TestCase {
+
+  
+
+  def testSimpleCompressDecompress() {
+
+    val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+
+    val message = CompressionUtils.compress(messages)
+
+    val decompressedMessages = CompressionUtils.decompress(message)
+
+    TestUtils.checkLength(decompressedMessages.iterator,3)
+
+    TestUtils.checkEquals(messages.iterator, decompressedMessages.iterator)
+  }
+
+  def testComplexCompressDecompress() {
+
+    val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+
+    val message = CompressionUtils.compress(messages.slice(0, 2))
+
+    val complexMessages = List[Message](message):::messages.slice(2,3)
+
+    val complexMessage = CompressionUtils.compress(complexMessages)
+
+    val decompressedMessages = CompressionUtils.decompress(complexMessage)
+
+    TestUtils.checkLength(decompressedMessages.iterator,2)
+
+    TestUtils.checkLength(decompressedMessages.iterator,3)
+
+    TestUtils.checkEquals(messages.iterator, decompressedMessages.iterator)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.util.Arrays
+import junit.framework.TestCase
+import junit.framework.Assert._
+import kafka.utils.TestUtils._
+import org.junit.Test
+import kafka.message._
+
+class FileMessageSetTest extends BaseMessageSetTestCases {
+  
+  val messageSet = createMessageSet(messages)
+  
+  def createMessageSet(messages: Seq[Message]): FileMessageSet = {
+    val set = new FileMessageSet(tempFile(), true)
+    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+    set.flush()
+    set
+  }
+
+  @Test
+  def testFileSize() {
+    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+    messageSet.append(singleMessageSet("abcd".getBytes()))
+    assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+  }
+  
+  @Test
+  def testIterationOverPartialAndTruncation() {
+    testPartialWrite(0, messageSet)
+	  testPartialWrite(2, messageSet)
+    testPartialWrite(4, messageSet)
+    testPartialWrite(5, messageSet)
+    testPartialWrite(6, messageSet)
+  }
+  
+  def testPartialWrite(size: Int, messageSet: FileMessageSet) {
+    val buffer = ByteBuffer.allocate(size)
+    val originalPosition = messageSet.channel.position
+    for(i <- 0 until size)
+    	buffer.put(0.asInstanceOf[Byte])
+    buffer.rewind()
+    messageSet.channel.write(buffer)
+    // appending those bytes should not change the contents
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+    assertEquals("Unexpected number of bytes truncated", size.longValue, messageSet.recover())
+    assertEquals("File pointer should now be at the end of the file.", originalPosition, messageSet.channel.position)
+    // nor should recovery change the contents
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+  }
+  
+  @Test
+  def testIterationDoesntChangePosition() {
+    val position = messageSet.channel.position
+    checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+    assertEquals(position, messageSet.channel.position)
+  }
+  
+  @Test
+  def testRead() {
+    val read = messageSet.read(0, messageSet.sizeInBytes)
+    checkEquals(messageSet.iterator, read.iterator)
+    val items = read.iterator.toList
+    val first = items.head
+    val read2 = messageSet.read(first.offset, messageSet.sizeInBytes)
+    checkEquals(items.tail.iterator, read2.iterator)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.util._
+import java.nio._
+import java.nio.channels._
+import java.io._
+import junit.framework.TestCase
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{Before, Test}
+import kafka.utils.TestUtils
+
+class MessageTest extends JUnitSuite {
+  
+  var message: Message = null
+  val payload = "some bytes".getBytes()
+
+  @Before
+  def setUp(): Unit = {
+    message = new Message(payload)
+  }
+  
+  @Test
+  def testFieldValues = {
+    TestUtils.checkEquals(ByteBuffer.wrap(payload), message.payload)
+    assertEquals(Message.CurrentMagicValue, message.magic)
+    assertEquals(69L, new Message(69, "hello".getBytes()).checksum)
+  }
+
+  @Test
+  def testChecksum() {
+    assertTrue("Auto-computed checksum should be valid", message.isValid)
+    val badChecksum = message.checksum + 1 % Int.MaxValue
+    val invalid = new Message(badChecksum, payload)
+    assertEquals("Message should return written checksum", badChecksum, invalid.checksum)
+    assertFalse("Message with invalid checksum should be invalid", invalid.isValid)
+  }
+  
+  @Test
+  def testEquality() = {
+    assertFalse("Should not equal null", message.equals(null))
+    assertFalse("Should not equal a random string", message.equals("asdf"))
+    assertTrue("Should equal itself", message.equals(message))
+    val copy = new Message(message.checksum, payload)
+    assertTrue("Should equal another message with the same content.", message.equals(copy))
+  }
+  
+  @Test
+  def testIsHashable() = {
+    // this is silly, but why not
+    val m = new HashMap[Message,Boolean]()
+    m.put(message, true)
+    assertNotNull(m.get(message))
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import junit.framework.{Assert, TestCase}
+import java.util.Properties
+import org.easymock.EasyMock
+import kafka.api.ProducerRequest
+import org.apache.log4j.{Logger, Level}
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import kafka.producer.async._
+import kafka.serializer.Encoder
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+class AsyncProducerTest extends JUnitSuite {
+
+  private val messageContent1 = "test"
+  private val topic1 = "test-topic"
+  private val message1: Message = new Message(messageContent1.getBytes)
+
+  private val messageContent2 = "test1"
+  private val topic2 = "test1$topic"
+  private val message2: Message = new Message(messageContent2.getBytes)
+  val asyncProducerLogger = Logger.getLogger(classOf[AsyncProducer[String]])
+
+  @Test
+  def testProducerQueueSize() {
+    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+      getMessageSetOfSize(List(message1), 10)))))
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "10")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+    //temporarily set log4j to a higher level to avoid error in the output
+    producer.setLoggerLevel(Level.FATAL)
+
+    try {
+      for(i <- 0 until 11) {
+        producer.send(messageContent1 + "-topic", messageContent1)
+      }
+      Assert.fail("Queue should be full")
+    }
+    catch {
+      case e: QueueFullException => println("Queue is full..")
+    }
+    producer.start
+    producer.close
+    Thread.sleep(2000)
+    EasyMock.verify(basicProducer)
+    producer.setLoggerLevel(Level.ERROR)
+  }
+
+  @Test
+  def testAddAfterQueueClosed() {
+    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+      getMessageSetOfSize(List(message1), 10)))))
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "10")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+    producer.start
+    for(i <- 0 until 10) {
+      producer.send(messageContent1 + "-topic", messageContent1)
+    }
+    producer.close
+
+    try {
+      producer.send(messageContent1 + "-topic", messageContent1)
+      Assert.fail("Queue should be closed")
+    } catch {
+      case e: QueueClosedException =>
+    }
+    EasyMock.verify(basicProducer)
+  }
+
+  @Test
+  def testBatchSize() {
+    val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+      getMessageSetOfSize(List(message1), 5)))))
+    EasyMock.expectLastCall.times(2)
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+      getMessageSetOfSize(List(message1), 1)))))
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "10")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("batch.size", "5")
+
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+    producer.start
+    for(i <- 0 until 10) {
+      producer.send(messageContent1 + "-topic", messageContent1)
+    }
+
+    Thread.sleep(100)
+    try {
+      producer.send(messageContent1 + "-topic", messageContent1)
+    } catch {
+      case e: QueueFullException =>
+        Assert.fail("Queue should not be full")
+    }
+
+    producer.close
+    EasyMock.verify(basicProducer)
+  }
+
+  @Test
+  def testQueueTimeExpired() {
+    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+      getMessageSetOfSize(List(message1), 3)))))
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "10")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("queue.time", "200")
+
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+    val serializer = new StringSerializer
+
+    producer.start
+    for(i <- 0 until 3) {
+      producer.send(serializer.getTopic(messageContent1), messageContent1, ProducerRequest.RandomPartition)
+    }
+
+    Thread.sleep(300)
+    producer.close
+    EasyMock.verify(basicProducer)
+  }
+
+  @Test
+  def testSenderThreadShutdown() {
+    val syncProducerProps = new Properties()
+    syncProducerProps.put("host", "localhost")
+    syncProducerProps.put("port", "9092")
+    syncProducerProps.put("buffer.size", "1000")
+    syncProducerProps.put("connect.timeout.ms", "1000")
+    syncProducerProps.put("reconnect.interval", "1000")
+    val basicProducer = new MockProducer(new SyncProducerConfig(syncProducerProps))
+
+    val asyncProducerProps = new Properties()
+    asyncProducerProps.put("host", "localhost")
+    asyncProducerProps.put("port", "9092")
+    asyncProducerProps.put("queue.size", "10")
+    asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
+    asyncProducerProps.put("queue.time", "100")
+
+    val config = new AsyncProducerConfig(asyncProducerProps)
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+    producer.start
+    producer.send(messageContent1 + "-topic", messageContent1)
+    producer.close
+  }
+
+  @Test
+  def testCollateEvents() {
+    val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+    basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition,
+                                                                     getMessageSetOfSize(List(message2), 5)),
+                                                 new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+                                                                     getMessageSetOfSize(List(message1), 5)))))
+    EasyMock.expectLastCall
+    basicProducer.close
+    EasyMock.expectLastCall
+    EasyMock.replay(basicProducer)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("queue.size", "50")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("batch.size", "10")
+
+    val config = new AsyncProducerConfig(props)
+
+    val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+    producer.start
+    val serializer = new StringSerializer
+    for(i <- 0 until 5) {
+      producer.send(messageContent1 + "-topic", messageContent1)
+      producer.send(messageContent2 + "$topic", messageContent2, ProducerRequest.RandomPartition)
+    }
+
+    producer.close
+    EasyMock.verify(basicProducer)
+
+  }
+
+  private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
+    var messageList = new Array[Message](counts)
+    for(message <- messages) {
+      for(i <- 0 until counts) {
+        messageList(i) = message
+      }
+    }
+    new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+  }
+
+  class StringSerializer extends Encoder[String] {
+    def toMessage(event: String):Message = new Message(event.getBytes)
+    def getTopic(event: String): String = event.concat("-topic")
+  }
+
+  class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {
+    override def send(topic: String, messages: ByteBufferMessageSet): Unit = {
+      Thread.sleep(1000)
+    }
+    override def multiSend(produces: Array[ProducerRequest]) {
+      Thread.sleep(1000)
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package unit.kafka.producer
+
+import collection.immutable.SortedSet
+import java.util._
+import junit.framework.Assert._
+import kafka.cluster.Partition
+import kafka.common.NoBrokersForPartitionException
+import kafka.producer._
+import org.easymock.EasyMock
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import scala.collection.immutable.List
+
+class ProducerMethodsTest extends JUnitSuite {
+
+  @Test
+  def producerThrowsNoBrokersException() = {
+    val props = new Properties
+    props.put("broker.list", "placeholder") // Need to fake out having specified one
+    val config = new ProducerConfig(props)
+    val mockPartitioner = EasyMock.createMock(classOf[Partitioner[String]])
+    val mockProducerPool = EasyMock.createMock(classOf[ProducerPool[String]])
+    val mockBrokerPartitionInfo = EasyMock.createMock(classOf[kafka.producer.BrokerPartitionInfo])
+
+    EasyMock.expect(mockBrokerPartitionInfo.getBrokerPartitionInfo("the_topic")).andReturn(SortedSet[Partition]())
+    EasyMock.replay(mockBrokerPartitionInfo)
+
+    val producer = new Producer[String, String](config,mockPartitioner, mockProducerPool,false, mockBrokerPartitionInfo)
+
+    try {
+      val producerData = new ProducerData[String, String]("the_topic", "the_key", List("the_datum"))
+      producer.send(producerData)
+      fail("Should have thrown a NoBrokersForPartitionException.")
+    } catch {
+      case nb: NoBrokersForPartitionException => assertTrue(nb.getMessage.contains("the_key"))
+    }
+
+  }
+}
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,689 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.{AsyncProducerConfig, AsyncProducer}
+import java.util.Properties
+import org.apache.log4j.{Logger, Level}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.zk.EmbeddedZookeeper
+import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import collection.mutable.HashMap
+import org.easymock.EasyMock
+import java.util.concurrent.ConcurrentHashMap
+import kafka.cluster.Partition
+import org.scalatest.junit.JUnitSuite
+import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException}
+import kafka.utils.{TestUtils, TestZKUtils, Utils}
+import kafka.serializer.{StringEncoder, Encoder}
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequest
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+class ProducerTest extends JUnitSuite {
+  private val topic = "test-topic"
+  private val brokerId1 = 0
+  private val brokerId2 = 1  
+  private val port1 = 9098
+  private val port2 = 9099
+  private var server1: KafkaServer = null
+  private var server2: KafkaServer = null
+  private var producer1: SyncProducer = null
+  private var producer2: SyncProducer = null
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+  private var zkServer:EmbeddedZookeeper = null
+  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  @Before
+  def setUp() {
+    // set up 2 brokers with 4 partitions each
+    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+
+    val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+    val config1 = new KafkaConfig(props1) {
+      override val numPartitions = 4
+    }
+    server1 = TestUtils.createServer(config1)
+
+    val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+    val config2 = new KafkaConfig(props2) {
+      override val numPartitions = 4
+    }
+    server2 = TestUtils.createServer(config2)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", port1.toString)
+
+    producer1 = new SyncProducer(new SyncProducerConfig(props))
+    producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                          messages = new Message("test".getBytes())))
+
+    producer2 = new SyncProducer(new SyncProducerConfig(props) {
+      override val port = port2
+    })
+    producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                          messages = new Message("test".getBytes())))
+
+    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
+    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    Thread.sleep(500)
+  }
+
+  @After
+  def tearDown() {
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+    server1.shutdown
+    server2.shutdown
+    Utils.rm(server1.config.logDir)
+    Utils.rm(server2.config.logDir)    
+    Thread.sleep(500)
+    zkServer.shutdown
+    Thread.sleep(500)
+  }
+
+  @Test
+  def testSend() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testSendSingleMessage() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9092")
+
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    // it should send to random partition on broker 1
+    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+
+    syncProducers.put(brokerId1, syncProducer1)
+
+    val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+      new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    producer.send(new ProducerData[String, String](topic, "t"))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+  }
+
+  @Test
+  def testInvalidPartition() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val config = new ProducerConfig(props)
+
+    val richProducer = new Producer[String, String](config)
+    try {
+      richProducer.send(new ProducerData[String, String](topic, "test", Array("test")))
+      Assert.fail("Should fail with InvalidPartitionException")
+    }catch {
+      case e: InvalidPartitionException => // expected, do nothing
+    }
+  }
+
+  @Test
+  def testDefaultEncoder() {
+    val props = new Properties()
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val config = new ProducerConfig(props)
+
+    val stringProducer1 = new Producer[String, String](config)
+    try {
+      stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test")))
+      fail("Should fail with ClassCastException due to incompatible Encoder")
+    } catch {
+      case e: ClassCastException =>
+    }
+
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
+    stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
+
+    val messageProducer1 = new Producer[String, Message](config)
+    try {
+      messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
+    } catch {
+      case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
+    }
+  }
+
+  @Test
+  def testSyncProducerPool() {
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+    syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    // default for producer.type is "sync"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+
+    producerPool.close
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testAsyncProducerPool() {
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    asyncProducer1.send(topic, "test1", 0)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    asyncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+    EasyMock.replay(asyncProducer2)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+    asyncProducers.put(brokerId2, asyncProducer2)
+
+    // change producer.type to "async"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+    producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+
+    producerPool.close
+    EasyMock.verify(asyncProducer1)
+    EasyMock.verify(asyncProducer2)
+  }
+
+  @Test
+  def testSyncUnavailableProducerException() {
+    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId2, syncProducer2)
+
+    // default for producer.type is "sync"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    try {
+      producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+      Assert.fail("Should fail with UnavailableProducerException")
+    }catch {
+      case e: UnavailableProducerException => // expected
+    }
+
+    producerPool.close
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testAsyncUnavailableProducerException() {
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    asyncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+    EasyMock.replay(asyncProducer2)
+
+    asyncProducers.put(brokerId2, asyncProducer2)
+
+    // change producer.type to "async"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+    try {
+      producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+      Assert.fail("Should fail with UnavailableProducerException")
+    }catch {
+      case e: UnavailableProducerException => // expected
+    }
+
+    producerPool.close
+    EasyMock.verify(asyncProducer1)
+    EasyMock.verify(asyncProducer2)
+  }
+
+  @Test
+  def testConfigBrokerPartitionInfoWithPartitioner {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
+                                       brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
+
+    var config: ProducerConfig = null
+    try {
+      config = new ProducerConfig(props)
+      fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
+    }catch {
+      case e: InvalidConfigException => // expected
+    }
+  }
+
+  @Test
+  def testConfigBrokerPartitionInfo() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    asyncProducer1.send(topic, "test1", -1)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+
+    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
+    producer.close
+
+    EasyMock.verify(asyncProducer1)
+  }
+
+  @Test
+  def testZKSendToNewTopic() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val serializer = new StringEncoder
+
+    val producer = new Producer[String, String](config)
+    try {
+      // Available broker id, partition id at this stage should be (0,0), (1,0)
+      // this should send the message to broker 0 on partition 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+      // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
+      // Since 4 % 5 = 4, this should send the message to broker 1 on partition 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }
+    producer.close
+  }
+
+  @Test
+  def testZKSendWithDeadBroker() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val serializer = new StringEncoder
+
+    val producer = new Producer[String, String](config)
+    try {
+      // Available broker id, partition id at this stage should be (0,0), (1,0)
+      // Hence, this should send the message to broker 0 on partition 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+      // kill 2nd broker
+      server2.shutdown
+      Thread.sleep(100)
+      // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
+      // Since 4 % 5 = 4, in a normal case, it would send to broker 1 on partition 0. But since broker 1 is down,
+      // 4 % 4 = 0, So it should send the message to broker 0 on partition 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+    } catch {
+      case e: Exception => fail("Not expected")
+    }
+    producer.close
+  }
+
+  @Test
+  def testZKSendToExistingTopicWithNoBrokers() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val serializer = new StringEncoder
+
+    val producer = new Producer[String, String](config)
+    var server: KafkaServer = null
+
+    try {
+      // shutdown server1
+      server1.shutdown
+      Thread.sleep(100)
+      // Available broker id, partition id at this stage should be (1,0)
+      // this should send the message to broker 1 on partition 0
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
+
+      // shutdown server2
+      server2.shutdown
+      Thread.sleep(100)
+      // delete the new-topic logs
+      Utils.rm(server2.config.logDir)
+      Thread.sleep(100)
+      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
+      val props2 = TestUtils.createBrokerConfig(brokerId1, port1)
+      val config2 = new KafkaConfig(props2) {
+        override val numPartitions = 4
+      }
+      server = TestUtils.createServer(config2)
+      Thread.sleep(100)
+
+      // now there are no brokers registered under test-topic.
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+      Thread.sleep(100)
+
+      // cross check if brokers got the messages
+      val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
+
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }finally {
+      server.shutdown
+      producer.close
+    }
+  }
+
+  @Test
+  def testPartitionedSendToNewTopic() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                                  messages = new Message("test1".getBytes)))
+    EasyMock.expectLastCall
+    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                                  messages = new Message("test1".getBytes)))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
+    Thread.sleep(100)
+
+    // now send again to this topic using a real producer, this time all brokers would have registered
+    // their partitions in zookeeper
+    producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                           messages = new Message("test".getBytes())))
+    Thread.sleep(100)
+
+    // wait for zookeeper to register the new topic
+    producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1")))
+    Thread.sleep(100)
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testPartitionedSendToNewBrokerInExistingTopic() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+    val syncProducer3 = EasyMock.createMock(classOf[SyncProducer])
+    syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                                 messages = new Message("test1".getBytes)))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    syncProducer3.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+    EasyMock.replay(syncProducer3)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+    syncProducers.put(2, syncProducer3)
+
+    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    val serverProps = TestUtils.createBrokerConfig(2, 9094)
+    val serverConfig = new KafkaConfig(serverProps) {
+      override val numPartitions = 4
+    }
+    val server3 = TestUtils.createServer(serverConfig)
+    Thread.sleep(500)
+
+    // send a message to the new broker to register it under topic "test-topic"
+    val tempProps = new Properties()
+    tempProps.put("host", "localhost")
+    tempProps.put("port", "9094")
+    val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
+    tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                             messages = new Message("test".getBytes())))
+
+    Thread.sleep(500)
+
+    producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+    EasyMock.verify(syncProducer3)
+
+    server3.shutdown
+    Utils.rm(server3.config.logDir)
+  }
+
+  @Test
+  def testDefaultPartitioner() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+    val config = new ProducerConfig(props)
+    val partitioner = new DefaultPartitioner[String]
+    val serializer = new StringSerializer
+
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    asyncProducer1.send(topic, "test1", -1)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+
+    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+    val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+    producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
+    producer.close
+
+    EasyMock.verify(asyncProducer1)
+  }
+}
+
+class StringSerializer extends Encoder[String] {
+  def toEvent(message: Message):String = message.toString
+  def toMessage(event: String):Message = new Message(event.getBytes)
+  def getTopic(event: String): String = event.concat("-topic")
+}
+
+class NegativePartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    -1
+  }
+}
+
+class StaticPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.length % numPartitions)
+  }
+}
+
+class HashPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.hashCode % numPartitions)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import junit.framework.{Assert, TestCase}
+import kafka.utils.SystemTime
+import kafka.utils.TestUtils
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.apache.log4j.{Logger, Level}
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.common.MessageSizeTooLargeException
+import java.util.Properties
+import kafka.api.ProducerRequest
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class SyncProducerTest extends JUnitSuite {
+  private var messageBytes =  new Array[Byte](2);
+  private var server: KafkaServer = null
+  val simpleProducerLogger = Logger.getLogger(classOf[SyncProducer])
+
+  @Before
+  def setUp() {
+    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
+    {
+      override val enableZookeeper = false
+    })
+  }
+
+  @After
+  def tearDown() {
+    server.shutdown
+  }
+
+  @Test
+  def testUnreachableServer() {
+    val props = new Properties()
+    props.put("host", "NOT_USED")
+    props.put("port", "9092")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "1000")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+
+    //temporarily increase log4j level to avoid error in output
+    simpleProducerLogger.setLevel(Level.FATAL)
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    println("First message send retries took " + (firstEnd-firstStart) + " ms")
+    Assert.assertTrue((firstEnd-firstStart) < 300)
+
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed = true
+
+    }
+    val secondEnd = SystemTime.milliseconds
+    println("Second message send retries took " + (secondEnd-secondStart) + " ms")
+    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    simpleProducerLogger.setLevel(Level.ERROR)
+  }
+
+  @Test
+  def testReachableServer() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "500")
+    props.put("reconnect.interval", "1000")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed=true
+    }
+    Assert.assertFalse(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    Assert.assertTrue((firstEnd-firstStart) < 500)
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertFalse(failed)
+    val secondEnd = SystemTime.milliseconds
+    Assert.assertTrue((secondEnd-secondEnd) < 500)
+
+    try {
+      producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
+    }catch {
+      case e: Exception => failed=true
+    }
+    Assert.assertFalse(failed)
+  }
+
+  @Test
+  def testReachableServerWrongPort() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9091")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+    //temporarily increase log4j level to avoid error in output
+    simpleProducerLogger.setLevel(Level.FATAL)
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    Assert.assertTrue((firstEnd-firstStart) < 300)
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    val secondEnd = SystemTime.milliseconds
+    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    simpleProducerLogger.setLevel(Level.ERROR)
+  }
+
+  @Test
+  def testMessageSizeTooLarge() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9091")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    props.put("max.message.size", "100")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val bytes = new Array[Byte](101)
+    var failed = false
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes)))
+    }catch {
+      case e: MessageSizeTooLargeException => failed = true
+    }
+    Assert.assertTrue(failed)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import java.io.File
+import kafka.utils.Utils
+import kafka.api.FetchRequest
+import kafka.integration.ProducerConsumerTestHarness
+import kafka.producer.{SyncProducer, SyncProducerConfig}
+import kafka.consumer.SimpleConsumer
+import java.util.Properties
+import org.scalatest.junit.JUnitSuite
+import junit.framework.{Assert, TestCase}
+import org.junit.{After, Before, Test}
+import junit.framework.Assert._
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class ServerShutdownTest extends JUnitSuite {
+  val port = 9999
+
+  @Test
+  def testCleanShutdown() {
+    val props = TestUtils.createBrokerConfig(0, port)
+    val config = new KafkaConfig(props) {
+      override val enableZookeeper = false
+    }
+
+    val host = "localhost"
+    val topic = "test"
+    val sent1 = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+    val sent2 = new ByteBufferMessageSet(NoCompressionCodec, new Message("more".getBytes()), new Message("messages".getBytes()))
+
+    {
+      val producer = new SyncProducer(getProducerConfig(host,
+                                                        port,
+                                                        64*1024,
+                                                        100000,
+                                                        10000))
+      val consumer = new SimpleConsumer(host,
+                                        port,
+                                        1000000,
+                                        64*1024)
+
+      val server = new KafkaServer(config)
+      server.startup()
+
+      // send some messages
+      producer.send(topic, sent1)
+      sent1.getBuffer.rewind
+
+      Thread.sleep(200)
+      // do a clean shutdown
+      server.shutdown()
+      val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
+      assertTrue(cleanShutDownFile.exists)
+    }
+
+
+    {
+      val producer = new SyncProducer(getProducerConfig(host,
+                                                        port,
+                                                        64*1024,
+                                                        100000,
+                                                        10000))
+      val consumer = new SimpleConsumer(host,
+                                        port,
+                                        1000000,
+                                        64*1024)
+
+      val server = new KafkaServer(config)
+      server.startup()
+
+      // bring the server back again and read the messages
+      var fetched: ByteBufferMessageSet = null
+      while(fetched == null || fetched.validBytes == 0)
+        fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+      TestUtils.checkEquals(sent1.iterator, fetched.iterator)
+      val newOffset = fetched.validBytes
+
+      // send some more messages
+      producer.send(topic, sent2)
+      sent2.getBuffer.rewind
+
+      Thread.sleep(200)
+
+      fetched = null
+      while(fetched == null || fetched.validBytes == 0)
+        fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
+      TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)
+
+      server.shutdown()
+      Utils.rm(server.config.logDir)
+    }
+
+  }
+
+  private def getProducerConfig(host: String, port: Int, bufferSize: Int, connectTimeout: Int,
+                                reconnectInterval: Int): SyncProducerConfig = {
+    val props = new Properties()
+    props.put("host", host)
+    props.put("port", port.toString)
+    props.put("buffer.size", bufferSize.toString)
+    props.put("connect.timeout.ms", connectTimeout.toString)
+    props.put("reconnect.interval", reconnectInterval.toString)
+    new SyncProducerConfig(props)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import java.net._
+import java.nio._
+import java.nio.channels._
+import java.util.Random
+import java.util.Properties
+import junit.framework.Assert._
+import kafka.server._
+import kafka.producer._
+import kafka.message._
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.ConsumerConfig
+
+/**
+ * Utility functions to help with testing
+ */
+object TestUtils {
+  
+  val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+  val Digits = "0123456789"
+  val LettersAndDigits = Letters + Digits
+  
+  /* A consistent random number generator to make tests repeatable */
+  val seededRandom = new Random(192348092834L)
+  val random = new Random()
+  
+  /**
+   * Choose a number of random available ports
+   */
+  def choosePorts(count: Int): List[Int] = {
+    val sockets = 
+      for(i <- 0 until count)
+        yield new ServerSocket(0)
+    val socketList = sockets.toList
+    val ports = socketList.map(_.getLocalPort)
+    socketList.map(_.close)
+    ports
+  }
+  
+  /**
+   * Choose an available port
+   */
+  def choosePort(): Int = choosePorts(1).head
+  
+  /**
+   * Create a temporary directory
+   */
+  def tempDir(): File = {
+    val ioDir = System.getProperty("java.io.tmpdir")
+    val f = new File(ioDir, "kafka-" + random.nextInt(1000000))
+    f.mkdirs()
+    f.deleteOnExit()
+    f
+  }
+  
+  /**
+   * Create a temporary file
+   */
+  def tempFile(): File = {
+    val f = File.createTempFile("kafka", ".tmp")
+    f.deleteOnExit()
+    f
+  }
+  
+  /**
+   * Create a temporary file and return an open file channel for this file
+   */
+  def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
+  
+  /**
+   * Create a kafka server instance with appropriate test settings
+   * @param config The configuration of the server
+   */
+  def createServer(config: KafkaConfig): KafkaServer = {
+    val server = new KafkaServer(config)
+    server.startup()
+    server
+  }
+  
+  /**
+   * Create a test config for the given node id
+   */
+  def createBrokerConfigs(numConfigs: Int): List[Properties] = {
+    for((port, node) <- choosePorts(numConfigs).zipWithIndex)
+      yield createBrokerConfig(node, port)
+  }
+  
+  /**
+   * Create a test config for the given node id
+   */
+  def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+    val props = new Properties
+    props.put("brokerid", nodeId.toString)
+    props.put("port", port.toString)
+    props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
+    props.put("log.flush.interval", "1")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props
+  }
+  
+  /**
+   * Create a test config for a consumer
+   */
+  def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String,
+                               consumerTimeout: Long = -1): Properties = {
+    val props = new Properties
+    props.put("zk.connect", zkConnect)
+    props.put("groupid", groupId)
+    props.put("consumerid", consumerId)
+    props.put("consumer.timeout.ms", consumerTimeout.toString)
+    props.put("zk.sessiontimeout.ms", "400")
+    props.put("zk.synctime.ms", "200")
+    props.put("autocommit.interval.ms", "1000")
+
+    props
+  }
+
+  /**
+   * Wrap the message in a message set
+   * @param payload The bytes of the message
+   */
+  def singleMessageSet(payload: Array[Byte]) = 
+    new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
+  
+  /**
+   * Generate an array of random bytes
+   * @param numBytes The size of the array
+   */
+  def randomBytes(numBytes: Int): Array[Byte] = {
+    val bytes = new Array[Byte](numBytes)
+    seededRandom.nextBytes(bytes)
+    bytes
+  }
+  
+  /**
+   * Generate a random string of letters and digits of the given length
+   * @param len The length of the string
+   * @return The random string
+   */
+  def randomString(len: Int): String = {
+    val b = new StringBuilder()
+    for(i <- 0 until len)
+      b.append(LettersAndDigits.charAt(seededRandom.nextInt(LettersAndDigits.length)))
+    b.toString
+  }
+
+  /**
+   * Check that the buffer content from buffer.position() to buffer.limit() is equal
+   */
+  def checkEquals(b1: ByteBuffer, b2: ByteBuffer) {
+    assertEquals("Buffers should have equal length", b1.limit - b1.position, b2.limit - b2.position)
+    for(i <- 0 until b1.limit - b1.position)
+      assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
+  }
+  
+  /**
+   * Throw an exception if the two iterators are of differing lengths or contain
+   * different messages on their Nth element
+   */
+  def checkEquals[T](expected: Iterator[T], actual: Iterator[T]) {
+    var length = 0
+    while(expected.hasNext && actual.hasNext) {
+      length += 1
+      assertEquals(expected.next, actual.next)
+    }
+    
+    if (expected.hasNext)
+    {
+     var length1 = length;
+     while (expected.hasNext)
+     {
+       expected.next
+       length1 += 1
+     }
+     assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
+    }
+    
+    if (actual.hasNext)
+    {
+     var length2 = length;
+     while (actual.hasNext)
+     {
+       actual.next
+       length2 += 1
+     }
+     assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
+    }
+  }
+
+  /**
+   *  Throw an exception if an iterable has different length than expected
+   *  
+   */
+  def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
+    var n = 0
+    while (s1.hasNext) {
+      n+=1
+      s1.next
+    }
+    assertEquals(expectedLength, n)
+  }
+
+  /**
+   * Throw an exception if the two iterators are of differing lengths or contain
+   * different messages on their Nth element
+   */
+  def checkEquals[T](s1: java.util.Iterator[T], s2: java.util.Iterator[T]) {
+    while(s1.hasNext && s2.hasNext)
+      assertEquals(s1.next, s2.next)
+    assertFalse("Iterators have uneven length--first has more", s1.hasNext)
+    assertFalse("Iterators have uneven length--second has more", s2.hasNext)
+  }
+
+  def stackedIterator[T](s: Iterator[T]*): Iterator[T] = {
+    new Iterator[T] {
+      var cur: Iterator[T] = null
+      val topIterator = s.iterator
+
+      def hasNext() : Boolean = {
+        while (true) {
+          if (cur == null) {
+            if (topIterator.hasNext)
+              cur = topIterator.next
+            else
+              return false
+          }
+          if (cur.hasNext)
+            return true
+          cur = null
+        }
+        // should never reach her
+        throw new RuntimeException("should not reach here")
+      }
+
+      def next() : T = cur.next
+    }
+  }
+
+  /**
+   * Create a hexidecimal string for the given bytes
+   */
+  def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))
+  
+  /**
+   * Create a hexidecimal string for the given bytes
+   */
+  def hexString(buffer: ByteBuffer): String = {
+    val builder = new StringBuilder("0x")
+    for(i <- 0 until buffer.limit)
+      builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
+    builder.toString
+  }
+  
+  /**
+   * Create a producer for the given host and port
+   */
+  def createProducer(host: String, port: Int): SyncProducer = {
+    val props = new Properties()
+    props.put("host", host)
+    props.put("port", port.toString)
+    props.put("buffer.size", "65536")
+    props.put("connect.timeout.ms", "100000")
+    props.put("reconnect.interval", "10000")
+    return new SyncProducer(new SyncProducerConfig(props))
+  }
+
+  def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+    ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
+
+  }
+}
+
+object TestZKUtils {
+  val zookeeperConnect = "127.0.0.1:2182"  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.Logger
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+class UtilsTest extends JUnitSuite {
+  
+  private val logger = Logger.getLogger(classOf[UtilsTest]) 
+
+  @Test
+  def testSwallow() {
+    Utils.swallow(logger.info, throw new IllegalStateException("test"))
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import org.apache.zookeeper.server.ZooKeeperServer
+import org.apache.zookeeper.server.NIOServerCnxn
+import kafka.utils.TestUtils
+import org.I0Itec.zkclient.ZkClient
+import java.net.InetSocketAddress
+import kafka.utils.{Utils, StringSerializer}
+
+class EmbeddedZookeeper(val connectString: String) {
+  val snapshotDir = TestUtils.tempDir()
+  val logDir = TestUtils.tempDir()
+  val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 3000)
+  val port = connectString.split(":")(1).toInt
+  val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
+  factory.startup(zookeeper)
+  val client = new ZkClient(connectString)
+  client.setZkSerializer(StringSerializer)
+
+  def shutdown() {
+    factory.shutdown()
+    Utils.rm(logDir)
+    Utils.rm(snapshotDir)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.consumer.ConsumerConfig
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkUtils, StringSerializer}
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.junit.Assert
+import org.scalatest.junit.JUnit3Suite
+
+class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val zkConnect = TestZKUtils.zookeeperConnect
+  var zkSessionTimeoutMs = 1000
+
+  def testEphemeralNodeCleanup = {
+    val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+    var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+                                StringSerializer)
+
+    try {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
+    } catch {                       
+      case e: Exception => println("Exception in creating ephemeral node")
+    }
+
+    var testData: String = null
+
+    testData = ZkUtils.readData(zkClient, "/tmp/zktest")
+    Assert.assertNotNull(testData)
+
+    zkClient.close
+
+    Thread.sleep(zkSessionTimeoutMs)
+
+    zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+                                StringSerializer)
+
+    val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
+    Assert.assertFalse(nodeExists)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import junit.framework.Assert._
+import java.util.Collections
+import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
+import java.lang.Thread
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
+
+class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val zkConnect = TestZKUtils.zookeeperConnect
+  var dirs : ZKGroupTopicDirs = null
+  val topic = "topic1"
+  val group = "group1"
+  val firstConsumer = "consumer1"
+  val secondConsumer = "consumer2"
+
+  override def setUp() {
+    super.setUp()
+
+    dirs = new ZKGroupTopicDirs(group, topic)
+  }
+
+  def testLoadBalance() {
+    // create the first partition
+    ZkUtils.setupPartition(zookeeper.client, 400, "broker1", 1111, "topic1", 1)
+    // add the first consumer
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
+    zkConsumerConnector1.createMessageStreams(Map(topic -> 1))
+
+    {
+      // check Partition Owner Registry
+      val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+      val expected_1 = List( ("400-0", "group1_consumer1-0") )
+      checkSetEqual(actual_1, expected_1)
+    }
+
+    // add a second consumer
+    val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
+    val ZKConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
+    ZKConsumerConnector2.createMessageStreams(Map(topic -> 1))
+    // wait a bit to make sure rebalancing logic is triggered
+    Thread.sleep(200)
+
+    {
+      // check Partition Owner Registry
+      val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+      val expected_2 = List( ("400-0", "group1_consumer1-0") )
+      checkSetEqual(actual_2, expected_2)
+    }
+
+    {
+      // add a few more partitions
+      val brokers = List(
+        (200, "broker2", 1111, "topic1", 2),
+        (300, "broker3", 1111, "topic1", 2) )
+
+      for ((brokerID, host, port, topic, nParts) <- brokers)
+        ZkUtils.setupPartition(zookeeper.client, brokerID, host, port, topic, nParts)
+
+
+      // wait a bit to make sure rebalancing logic is triggered
+      Thread.sleep(1000)
+      // check Partition Owner Registry
+      val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
+      val expected_3 = List( ("200-0", "group1_consumer1-0"),
+                             ("200-1", "group1_consumer1-0"),
+                             ("300-0", "group1_consumer1-0"),
+                             ("300-1", "group1_consumer2-0"),
+                             ("400-0", "group1_consumer2-0") )
+      checkSetEqual(actual_3, expected_3)
+    }
+
+    {
+      // now delete a partition
+      ZkUtils.deletePartition(zookeeper.client, 400, "topic1")
+
+      // wait a bit to make sure rebalancing logic is triggered
+      Thread.sleep(500)
+      // check Partition Owner Registry
+      val actual_4 = getZKChildrenValues(dirs.consumerOwnerDir)
+      val expected_4 = List( ("200-0", "group1_consumer1-0"),
+                             ("200-1", "group1_consumer1-0"),
+                             ("300-0", "group1_consumer2-0"),
+                             ("300-1", "group1_consumer2-0") )
+      checkSetEqual(actual_4, expected_4)
+    }
+
+    zkConsumerConnector1.shutdown
+    ZKConsumerConnector2.shutdown
+  }
+
+  private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
+    import scala.collection.JavaConversions
+    val children = zookeeper.client.getChildren(path)
+    Collections.sort(children)
+    val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+    childrenAsSeq.map(partition =>
+      (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+  }
+
+  private def checkSetEqual(actual : Seq[Tuple2[String,String]], expected : Seq[Tuple2[String,String]]) {
+    assertEquals(expected.length, actual.length)
+    for (i <- 0 until expected.length) {
+      assertEquals(expected(i)._1, actual(i)._1)
+      assertEquals(expected(i)._2, actual(i)._2)
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import org.scalatest.junit.JUnit3Suite
+
+trait ZooKeeperTestHarness extends JUnit3Suite {
+  val zkConnect: String
+  var zookeeper: EmbeddedZookeeper = null
+
+  override def setUp() {
+    zookeeper = new EmbeddedZookeeper(zkConnect)
+    super.setUp
+  }
+
+  override def tearDown() {
+    super.tearDown
+    zookeeper.shutdown()
+  }
+
+}