You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [22/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,416 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.integration
+
+import scala.collection._
+import kafka.api.FetchRequest
+import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
+import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.ProducerRequest
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with kafka.integration.KafkaServerTestHarness {
+  
+  val port = 9999
+  val props = TestUtils.createBrokerConfig(0, port)
+  val config = new KafkaConfig(props) {
+                 override val enableZookeeper = false
+               }
+  val configs = List(config)
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  def testProduceAndFetch() {
+    // send some messages
+    val topic = "test"
+
+//    send an empty messageset first
+    val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                         messages = getMessageList(Seq.empty[Message]: _*))
+    producer.send(topic, sent2)
+    Thread.sleep(200)
+    sent2.getBuffer.rewind
+    var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+
+
+    // send some messages
+    val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                         messages = getMessageList(new Message("hello".getBytes()),
+      new Message("there".getBytes())))
+    producer.send(topic, sent3)
+
+    Thread.sleep(200)
+    sent3.getBuffer.rewind
+    var fetched3: ByteBufferMessageSet = null
+    while(fetched3 == null || fetched3.validBytes == 0)
+      fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    // send an invalid offset
+    try {
+      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+      fetchedWithError.iterator
+      fail("expect exception")
+    }
+    catch {
+      case e: OffsetOutOfRangeException => "this is good"
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testProduceAndFetchWithCompression() {
+    // send some messages
+    val topic = "test"
+
+//    send an empty messageset first
+    val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                         messages = getMessageList(Seq.empty[Message]: _*))
+    producer.send(topic, sent2)
+    Thread.sleep(200)
+    sent2.getBuffer.rewind
+    var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+
+
+    // send some messages
+    val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                         messages = getMessageList(new Message("hello".getBytes()),
+      new Message("there".getBytes())))
+    producer.send(topic, sent3)
+
+    Thread.sleep(200)
+    sent3.getBuffer.rewind
+    var fetched3: ByteBufferMessageSet = null
+    while(fetched3 == null || fetched3.validBytes == 0)
+      fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    // send an invalid offset
+    try {
+      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+      fetchedWithError.iterator
+      fail("expect exception")
+    }
+    catch {
+      case e: OffsetOutOfRangeException => "this is good"
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testProduceAndMultiFetch() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                           messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                     new Message(("b_" + topic).getBytes)))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches += new FetchRequest(topic, 0, 0, 10000)
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+      val iter = response.iterator
+      for(topic <- topics) {
+        if (iter.hasNext) {
+          val resp = iter.next
+      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        }
+        else
+          fail("fewer responses than expected")
+      }
+    }
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    {
+      // send some invalid offsets
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, 0, -1, 10000)
+
+      try {
+        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+        val iter = responses.iterator
+        while (iter.hasNext)
+    	    iter.next.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: OffsetOutOfRangeException => "this is good"
+      }
+    }    
+
+    {
+      // send some invalid partitions
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, -1, 0, 10000)
+
+      try {
+        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+        val iter = responses.iterator
+        while (iter.hasNext)
+    	    iter.next.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: InvalidPartitionException => "this is good"
+      }
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testProduceAndMultiFetchWithCompression() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                           messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                     new Message(("b_" + topic).getBytes)))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches += new FetchRequest(topic, 0, 0, 10000)
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+      val iter = response.iterator
+      for(topic <- topics) {
+        if (iter.hasNext) {
+          val resp = iter.next
+      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        }
+        else
+          fail("fewer responses than expected")
+      }
+    }
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    {
+      // send some invalid offsets
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, 0, -1, 10000)
+
+      try {
+        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+        val iter = responses.iterator
+        while (iter.hasNext)
+    	    iter.next.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: OffsetOutOfRangeException => "this is good"
+      }
+    }
+
+    {
+      // send some invalid partitions
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, -1, 0, 10000)
+
+      try {
+        val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+        val iter = responses.iterator
+        while (iter.hasNext)
+    	    iter.next.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: InvalidPartitionException => "this is good"
+      }
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testProduceAndMultiFetchJava() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                           messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                     new Message(("b_" + topic).getBytes)))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches.add(new FetchRequest(topic, 0, 0, 10000))
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(fetches)
+      val iter = response.iterator
+      for(topic <- topics) {
+        if (iter.hasNext) {
+          val resp = iter.next
+      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        }
+        else
+          fail("fewer responses than expected")
+      }
+    }
+  }
+
+  def testProduceAndMultiFetchJavaWithCompression() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                           messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                     new Message(("b_" + topic).getBytes)))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches.add(new FetchRequest(topic, 0, 0, 10000))
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(fetches)
+      val iter = response.iterator
+      for(topic <- topics) {
+        if (iter.hasNext) {
+          val resp = iter.next
+      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        }
+        else
+          fail("fewer responses than expected")
+      }
+    }
+  }
+
+  def testMultiProduce() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                         messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                   new Message(("b_" + topic).getBytes)))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+      
+    // wait a bit for produced message to be available
+    Thread.sleep(200)
+    val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+    val iter = response.iterator
+    for(topic <- topics) {
+      if (iter.hasNext) {
+        val resp = iter.next
+        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      }
+      else
+        fail("fewer responses than expected")
+    }
+  }
+
+  def testMultiProduceWithCompression() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                         messages = getMessageList(new Message(("a_" + topic).getBytes),
+                                                                   new Message(("b_" + topic).getBytes)))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+
+    // wait a bit for produced message to be available
+    Thread.sleep(200)
+    val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+    val iter = response.iterator
+    for(topic <- topics) {
+      if (iter.hasNext) {
+        val resp = iter.next
+        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      }
+      else
+        fail("fewer responses than expected")
+    }
+  }
+
+  private def getMessageList(messages: Message*): java.util.List[Message] = {
+    val messageList = new java.util.ArrayList[Message]()
+    messages.foreach(m => messageList.add(m))
+    messageList
+  }
+
+  private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = {
+    val fetchReqs = new java.util.ArrayList[FetchRequest]()
+    fetches.foreach(f => fetchReqs.add(f))
+    fetchReqs
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.integration
+
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.producer.SyncProducer
+import kafka.javaapi.consumer.SimpleConsumer
+
+trait ProducerConsumerTestHarness extends JUnit3Suite {
+  
+    val port: Int
+    val host = "localhost"
+    var producer: SyncProducer = null
+    var consumer: SimpleConsumer = null
+
+    override def setUp() {
+      val props = new Properties()
+      props.put("host", host)
+      props.put("port", port.toString)
+      props.put("buffer.size", "65536")
+      props.put("connect.timeout.ms", "100000")
+      props.put("reconnect.interval", "10000")
+      producer = new SyncProducer(new SyncProducerConfig(props))
+      consumer = new SimpleConsumer(host,
+                                   port,
+                                   1000000,
+                                   64*1024)
+      super.setUp
+    }
+
+   override def tearDown() {
+     super.tearDown
+     producer.close()
+     consumer.close()
+   }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.message
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+
+trait BaseMessageSetTestCases extends JUnitSuite {
+  
+  val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+  def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
+  def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
+    import scala.collection.JavaConversions._
+    val messages = asIterable(messageSet)
+    messages.map(m => m.message).iterator
+  }
+
+  @Test
+  def testWrittenEqualsRead {
+    import scala.collection.JavaConversions._
+    val messageSet = createMessageSet(messages)
+    TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
+  }
+
+  @Test
+  def testIteratorIsConsistent() {
+    import scala.collection.JavaConversions._
+    val m = createMessageSet(messages)
+    // two iterators over the same set should give the same results
+    TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+  }
+
+  @Test
+  def testIteratorIsConsistentWithCompression() {
+    import scala.collection.JavaConversions._
+    val m = createMessageSet(messages, DefaultCompressionCodec)
+    // two iterators over the same set should give the same results
+    TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+  }
+
+  @Test
+  def testSizeInBytes() {
+    assertEquals("Empty message set should have 0 bytes.",
+                 0L,
+                 createMessageSet(Array[Message]()).sizeInBytes)
+    assertEquals("Predicted size should equal actual size.", 
+                 kafka.message.MessageSet.messageSetSize(messages).toLong,
+                 createMessageSet(messages).sizeInBytes)
+  }
+
+  @Test
+  def testSizeInBytesWithCompression () {
+    assertEquals("Empty message set should have 0 bytes.",
+                 30L,           // overhead of the GZIP output stream
+                 createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.message
+
+import java.nio._
+import junit.framework.TestCase
+import junit.framework.Assert._
+import org.junit.Test
+import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
+
+class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases {
+
+  override def createMessageSet(messages: Seq[Message],
+                                compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
+    new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
+  
+  @Test
+  def testValidBytes() {
+    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                               messages = getMessageList(new Message("hello".getBytes()),
+                                                                      new Message("there".getBytes())))
+    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
+    buffer.put(messageList.getBuffer)
+    buffer.putShort(4)
+    val messageListPlus = new ByteBufferMessageSet(buffer)
+    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
+  }
+
+  @Test
+  def testValidBytesWithCompression () {
+    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                               messages = getMessageList(new Message("hello".getBytes()),
+                                                                         new Message("there".getBytes())))
+    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
+    buffer.put(messageList.getBuffer)
+    buffer.putShort(4)
+    val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
+    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
+  }
+
+  @Test
+  def testEquals() {
+    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                            messages = getMessageList(new Message("hello".getBytes()),
+                                                                      new Message("there".getBytes())))
+    val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                messages = getMessageList(new Message("hello".getBytes()),
+                                                                          new Message("there".getBytes())))
+
+    assertEquals(messageList, moreMessages)
+    assertTrue(messageList.equals(moreMessages))
+  }
+
+  @Test
+  def testEqualsWithCompression () {
+    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                            messages = getMessageList(new Message("hello".getBytes()),
+                                                                      new Message("there".getBytes())))
+    val moreMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+                                                messages = getMessageList(new Message("hello".getBytes()),
+                                                                          new Message("there".getBytes())))
+
+    assertEquals(messageList, moreMessages)
+    assertTrue(messageList.equals(moreMessages))
+  }
+
+  private def getMessageList(messages: Message*): java.util.List[Message] = {
+    val messageList = new java.util.ArrayList[Message]()
+    messages.foreach(m => messageList.add(m))
+    messageList
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,630 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.producer
+
+import java.util.Properties
+import org.apache.log4j.{Logger, Level}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.zk.EmbeddedZookeeper
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import collection.mutable.HashMap
+import org.easymock.EasyMock
+import kafka.utils.Utils
+import java.util.concurrent.ConcurrentHashMap
+import kafka.cluster.Partition
+import kafka.common.{UnavailableProducerException, InvalidPartitionException, InvalidConfigException}
+import org.scalatest.junit.JUnitSuite
+import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
+import kafka.producer.ProducerPool
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.javaapi.Implicits._
+import kafka.serializer.{StringEncoder, Encoder}
+import kafka.javaapi.consumer.SimpleConsumer
+import kafka.api.FetchRequest
+import kafka.message.{NoCompressionCodec, Message}
+
+class ProducerTest extends JUnitSuite {
+  private val topic = "test-topic"
+  private val brokerId1 = 0
+  private val brokerId2 = 1  
+  private val port1 = 9092
+  private val port2 = 9093
+  private var server1: KafkaServer = null
+  private var server2: KafkaServer = null
+  private var producer1: SyncProducer = null
+  private var producer2: SyncProducer = null
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+  private var zkServer:EmbeddedZookeeper = null
+  private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  @Before
+  def setUp() {
+    // set up 2 brokers with 4 partitions each
+    zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+
+    val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+    val config1 = new KafkaConfig(props1) {
+      override val numPartitions = 4
+    }
+    server1 = TestUtils.createServer(config1)
+
+    val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+    val config2 = new KafkaConfig(props2) {
+      override val numPartitions = 4
+    }
+    server2 = TestUtils.createServer(config2)
+
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", port1.toString)
+
+    producer1 = new SyncProducer(new SyncProducerConfig(props))
+    val messages1 = new java.util.ArrayList[Message]
+    messages1.add(new Message("test".getBytes()))
+    producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1))
+
+    producer2 = new SyncProducer(new SyncProducerConfig(props) {
+      override val port = port2
+    })
+    val messages2 = new java.util.ArrayList[Message]
+    messages2.add(new Message("test".getBytes()))
+
+    producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages2))
+
+    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
+    consumer2 = new SimpleConsumer("localhost", port2, 1000000, 64*1024)
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    Thread.sleep(500)
+  }
+
+  @After
+  def tearDown() {
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+    server1.shutdown
+    server2.shutdown
+    Utils.rm(server1.config.logDir)
+    Utils.rm(server2.config.logDir)    
+    Thread.sleep(500)
+    zkServer.shutdown
+  }
+
+  @Test
+  def testSend() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    val messageList = new java.util.ArrayList[Message]
+    messageList.add(new Message("test1".getBytes()))
+    syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+      new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    val messagesContent = new java.util.ArrayList[String]
+    messagesContent.add("test1")
+    producer.send(new ProducerData[String, String](topic, "test", messagesContent))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testSendSingleMessage() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9092")
+
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    // it should send to random partition on broker 1
+    val messageList = new java.util.ArrayList[Message]
+    messageList.add(new Message("t".getBytes()))
+    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+
+    syncProducers.put(brokerId1, syncProducer1)
+
+    val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+      new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    producer.send(new ProducerData[String, String](topic, "t"))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+  }
+
+  @Test
+  def testInvalidPartition() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val config = new ProducerConfig(props)
+
+    val richProducer = new Producer[String, String](config)
+    val messagesContent = new java.util.ArrayList[String]
+    messagesContent.add("test")
+    try {
+      richProducer.send(new ProducerData[String, String](topic, "test", messagesContent))
+      Assert.fail("Should fail with InvalidPartitionException")
+    }catch {
+      case e: InvalidPartitionException => // expected, do nothing
+    }
+  }
+
+  @Test
+  def testSyncProducerPool() {
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val messageList = new java.util.ArrayList[Message]
+    messageList.add(new Message("test1".getBytes()))
+    syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    // default for producer.type is "sync"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+
+    producerPool.close
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testAsyncProducerPool() {
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    asyncProducer1.send(topic, "test1", 0)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    asyncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+    EasyMock.replay(asyncProducer2)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+    asyncProducers.put(brokerId2, asyncProducer2)
+
+    // change producer.type to "async"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
+    producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+
+    producerPool.close
+    EasyMock.verify(asyncProducer1)
+    EasyMock.verify(asyncProducer2)
+  }
+
+  @Test
+  def testSyncUnavailableProducerException() {
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId2, syncProducer2)
+
+    // default for producer.type is "sync"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    try {
+      producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+      Assert.fail("Should fail with UnavailableProducerException")
+    }catch {
+      case e: UnavailableProducerException => // expected
+    }
+
+    producerPool.close
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testAsyncUnavailableProducerException() {
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    asyncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+    EasyMock.replay(asyncProducer2)
+
+    asyncProducers.put(brokerId2, asyncProducer2)
+
+    // change producer.type to "async"
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+      new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
+    try {
+      producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+      Assert.fail("Should fail with UnavailableProducerException")
+    }catch {
+      case e: UnavailableProducerException => // expected
+    }
+
+    producerPool.close
+    EasyMock.verify(asyncProducer1)
+    EasyMock.verify(asyncProducer2)
+  }
+
+  @Test
+  def testConfigBrokerPartitionInfoWithPartitioner {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
+                                       brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
+
+    var config: ProducerConfig = null
+    try {
+      config = new ProducerConfig(props)
+      fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
+    }catch {
+      case e: InvalidConfigException => // expected
+    }
+  }
+
+  @Test
+  def testConfigBrokerPartitionInfo() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    asyncProducer1.send(topic, "test1", -1)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+
+    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](),
+      asyncProducers)
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    val messagesContent = new java.util.ArrayList[String]
+    messagesContent.add("test1")
+    producer.send(new ProducerData[String, String](topic, "test1", messagesContent))
+    producer.close
+
+    EasyMock.verify(asyncProducer1)
+  }
+
+  @Test
+  def testZKSendToNewTopic() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val serializer = new StringEncoder
+
+    val producer = new Producer[String, String](config)
+    try {
+      import scala.collection.JavaConversions._
+      producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+      Thread.sleep(100)
+      producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+    } catch {
+      case e: Exception => fail("Not expected")
+    }
+    producer.close
+  }
+
+  @Test
+  def testZKSendWithDeadBroker() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val serializer = new StringEncoder
+
+    val producer = new Producer[String, String](config)
+    try {
+      import scala.collection.JavaConversions._
+      producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+      Thread.sleep(100)
+      // kill 2nd broker
+      server2.shutdown
+      Thread.sleep(100)
+      producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+      Thread.sleep(100)
+      // cross check if brokers got the messages
+      val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+      Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
+      Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+    } catch {
+      case e: Exception => fail("Not expected")
+    }
+    producer.close
+  }
+
+  @Test
+  def testPartitionedSendToNewTopic() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringEncoder
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    import scala.collection.JavaConversions._
+    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                                  messages = asList(Array(new Message("test1".getBytes)))))
+    EasyMock.expectLastCall
+    syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                                  messages = asList(Array(new Message("test1".getBytes)))))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+
+    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    producer.send(new ProducerData[String, String]("test-topic1", "test", asList(Array("test1"))))
+    Thread.sleep(100)
+
+    // now send again to this topic using a real producer, this time all brokers would have registered
+    // their partitions in zookeeper
+    producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                           messages = asList(Array(new Message("test".getBytes())))))
+    Thread.sleep(100)
+
+    // wait for zookeeper to register the new topic
+    producer.send(new ProducerData[String, String]("test-topic1", "test1", asList(Array("test1"))))
+    Thread.sleep(100)
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+  }
+
+  @Test
+  def testPartitionedSendToNewBrokerInExistingTopic() {
+    val props = new Properties()
+    props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+    val partitioner = new StaticPartitioner
+    val serializer = new StringSerializer
+
+    // 2 sync producers
+    val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+    val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val syncProducer3 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+    val messages1 = new java.util.ArrayList[Message]
+    messages1.add(new Message("test1".getBytes()))
+    syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1))
+    EasyMock.expectLastCall
+    syncProducer1.close
+    EasyMock.expectLastCall
+    syncProducer2.close
+    EasyMock.expectLastCall
+    syncProducer3.close
+    EasyMock.expectLastCall
+    EasyMock.replay(syncProducer1)
+    EasyMock.replay(syncProducer2)
+    EasyMock.replay(syncProducer3)
+
+    syncProducers.put(brokerId1, syncProducer1)
+    syncProducers.put(brokerId2, syncProducer2)
+    syncProducers.put(2, syncProducer3)
+
+    val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    val serverProps = TestUtils.createBrokerConfig(2, 9094)
+    val serverConfig = new KafkaConfig(serverProps) {
+      override val numPartitions = 4
+    }
+    val server3 = TestUtils.createServer(serverConfig)
+
+    // send a message to the new broker to register it under topic "test-topic"
+    val tempProps = new Properties()
+    tempProps.put("host", "localhost")
+    tempProps.put("port", "9094")
+    val tempProducer = new kafka.producer.SyncProducer(new SyncProducerConfig(tempProps))
+    val messageList = new java.util.ArrayList[Message]
+    messageList.add(new Message("test".getBytes()))
+    tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+
+    Thread.sleep(500)
+
+    val messagesContent = new java.util.ArrayList[String]
+    messagesContent.add("test1")
+    producer.send(new ProducerData[String, String]("test-topic", "test-topic", messagesContent))
+    producer.close
+
+    EasyMock.verify(syncProducer1)
+    EasyMock.verify(syncProducer2)
+    EasyMock.verify(syncProducer3)
+
+    server3.shutdown
+    Utils.rm(server3.config.logDir)
+  }
+
+  @Test
+  def testDefaultPartitioner() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("producer.type", "async")
+    props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+    val config = new ProducerConfig(props)
+    val partitioner = new DefaultPartitioner[String]
+    val serializer = new StringSerializer
+
+    // 2 async producers
+    val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+    val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+    // it should send to partition 0 (first partition) on second broker i.e broker2
+    asyncProducer1.send(topic, "test1", -1)
+    EasyMock.expectLastCall
+    asyncProducer1.close
+    EasyMock.expectLastCall
+    EasyMock.replay(asyncProducer1)
+
+    asyncProducers.put(brokerId1, asyncProducer1)
+
+    val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](),
+      asyncProducers)
+    val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+    val messagesContent = new java.util.ArrayList[String]
+    messagesContent.add("test1")
+    producer.send(new ProducerData[String, String](topic, "test", messagesContent))
+    producer.close
+
+    EasyMock.verify(asyncProducer1)
+  }
+}
+
+class StringSerializer extends Encoder[String] {
+  def toEvent(message: Message):String = message.toString
+  def toMessage(event: String):Message = new Message(event.getBytes)
+  def getTopic(event: String): String = event.concat("-topic")
+}
+
+class NegativePartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    -1
+  }
+}
+
+class StaticPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.length % numPartitions)
+  }
+}
+
+class HashPartitioner extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = {
+    (data.hashCode % numPartitions)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.producer
+
+import junit.framework.{Assert, TestCase}
+import kafka.utils.SystemTime
+import kafka.utils.TestUtils
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.apache.log4j.{Logger, Level}
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import java.util.Properties
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.ProducerRequest
+import kafka.message.{NoCompressionCodec, Message}
+
+class SyncProducerTest extends JUnitSuite {
+  private var messageBytes =  new Array[Byte](2);
+  private var server: KafkaServer = null
+  val simpleProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
+
+  @Before
+  def setUp() {
+    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
+    {
+      override val enableZookeeper = false
+    })
+  }
+
+  @After
+  def tearDown() {
+    server.shutdown
+  }
+
+  @Test
+  def testUnreachableServer() {
+    val props = new Properties()
+    props.put("host", "NOT_USED")
+    props.put("port", "9092")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "1000")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+
+    //temporarily increase log4j level to avoid error in output
+    simpleProducerLogger.setLevel(Level.FATAL)
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    println("First message send retries took " + (firstEnd-firstStart) + " ms")
+    Assert.assertTrue((firstEnd-firstStart) < 300)
+
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed = true
+
+    }
+    val secondEnd = SystemTime.milliseconds
+    println("Second message send retries took " + (secondEnd-secondStart) + " ms")
+    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    simpleProducerLogger.setLevel(Level.ERROR)
+  }
+
+  @Test
+  def testReachableServer() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9092")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "500")
+    props.put("reconnect.interval", "1000")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed=true
+    }
+    Assert.assertFalse(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    Assert.assertTrue((firstEnd-firstStart) < 500)
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertFalse(failed)
+    val secondEnd = SystemTime.milliseconds
+    Assert.assertTrue((secondEnd-secondEnd) < 500)
+
+    try {
+      producer.multiSend(Array(new ProducerRequest("test", 0,
+        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                 messages = getMessageList(new Message(messageBytes))))))
+    }catch {
+      case e: Exception => failed=true
+    }
+    Assert.assertFalse(failed)
+  }
+
+  @Test
+  def testReachableServerWrongPort() {
+    val props = new Properties()
+    props.put("host", "localhost")
+    props.put("port", "9091")
+    props.put("buffer.size", "102400")
+    props.put("connect.timeout.ms", "300")
+    props.put("reconnect.interval", "500")
+    val producer = new SyncProducer(new SyncProducerConfig(props))
+    var failed = false
+    val firstStart = SystemTime.milliseconds
+    //temporarily increase log4j level to avoid error in output
+    simpleProducerLogger.setLevel(Level.FATAL)
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    failed = false
+    val firstEnd = SystemTime.milliseconds
+    Assert.assertTrue((firstEnd-firstStart) < 300)
+    val secondStart = SystemTime.milliseconds
+    try {
+      producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+                                                        messages = getMessageList(new Message(messageBytes))))
+    }catch {
+      case e: Exception => failed = true
+    }
+    Assert.assertTrue(failed)
+    val secondEnd = SystemTime.milliseconds
+    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    simpleProducerLogger.setLevel(Level.ERROR)
+  }
+
+  private def getMessageList(message: Message): java.util.List[Message] = {
+    val messageList = new java.util.ArrayList[Message]()
+    messageList.add(message)
+    messageList
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import junit.framework.Assert._
+import kafka.server.KafkaConfig
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.utils.{Utils, MockTime, TestUtils}
+import kafka.common.OffsetOutOfRangeException
+
+class LogManagerTest extends JUnitSuite {
+
+  val time: MockTime = new MockTime()
+  val maxLogAge = 1000
+  var logDir: File = null
+  var logManager: LogManager = null
+  var config:KafkaConfig = null
+
+  @Before
+  def setUp() {
+    val props = TestUtils.createBrokerConfig(0, -1)
+    config = new KafkaConfig(props) {
+                   override val logFileSize = 1024
+                   override val enableZookeeper = false
+                 }
+    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager.startup
+    logDir = logManager.logDir
+  }
+
+  @After
+  def tearDown() {
+    logManager.close()
+    Utils.rm(logDir)
+  }
+  
+  @Test
+  def testCreateLog() {
+    val log = logManager.getOrCreateLog("kafka", 0)
+    log.append(TestUtils.singleMessageSet("test".getBytes()))
+  }
+
+
+  @Test
+  def testCleanup() {
+    val log = logManager.getOrCreateLog("cleanup", 0)
+    var offset = 0L
+    for(i <- 0 until 1000) {
+      var set = TestUtils.singleMessageSet("test".getBytes())
+      log.append(set)
+      offset += set.sizeInBytes
+    }
+    log.flush
+    // Why this sleep is required ? File system takes some time to update the last modified time for a file.
+    // TODO: What is unknown is why 1 second or couple 100 milliseconds didn't work ?
+    Thread.sleep(2000)
+    assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+    time.currentMs += maxLogAge + 3000
+    logManager.cleanupLogs()
+    assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    try {
+      log.read(0, 1024)
+      fail("Should get exception from fetching earlier.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+    // log should still be appendable
+    log.append(TestUtils.singleMessageSet("test".getBytes()))
+  }
+
+  @Test
+  def testTimeBasedFlush() {
+    val props = TestUtils.createBrokerConfig(0, -1)
+    logManager.close
+    Thread.sleep(100)
+    config = new KafkaConfig(props) {
+                   override val logFileSize = 1024 *1024 *1024 
+                   override val enableZookeeper = false
+                   override val flushSchedulerThreadRate = 50
+                   override val flushInterval = Int.MaxValue
+                   override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
+                 }
+    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager.startup
+    val log = logManager.getOrCreateLog("timebasedflush", 0)
+    for(i <- 0 until 200) {
+      var set = TestUtils.singleMessageSet("test".getBytes())
+      log.append(set)
+    }
+
+    assertTrue("The last flush time has to be within defaultflushInterval of current time ",
+                     (System.currentTimeMillis - log.getLastFlushedTime) < 100)
+  }
+
+  @Test
+  def testConfigurablePartitions() {
+    val props = TestUtils.createBrokerConfig(0, -1)
+    logManager.close
+    Thread.sleep(100)
+    config = new KafkaConfig(props) {
+                   override val logFileSize = 256
+                   override val enableZookeeper = false
+                   override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
+                 }
+    
+    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager.startup
+    
+    for(i <- 0 until 2) {
+      val log = logManager.getOrCreateLog("testPartition", i)
+      for(i <- 0 until 250) {
+        var set = TestUtils.singleMessageSet("test".getBytes())
+        log.append(set)
+      }
+    }
+
+    try
+    {
+      val log = logManager.getOrCreateLog("testPartition", 2)
+      assertTrue("Should not come here", log != null)
+    } catch {
+       case _ =>
+    }
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.TestCase
+import java.io.File
+import kafka.utils.TestUtils
+import kafka.utils.Utils
+import kafka.server.{KafkaConfig, KafkaServer}
+import junit.framework.Assert._
+import java.util.{Random, Properties}
+import kafka.api.{FetchRequest, OffsetRequest}
+import collection.mutable.WrappedArray
+import kafka.consumer.SimpleConsumer
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+object LogOffsetTest {
+  val random = new Random()  
+}
+
+class LogOffsetTest extends JUnitSuite {
+  var logDir: File = null
+  var topicLogDir: File = null
+  var server: KafkaServer = null
+  var logSize: Int = 100
+  val brokerPort: Int = 9099
+  var simpleConsumer: SimpleConsumer = null
+
+  @Before
+  def setUp() {
+    val config: Properties = createBrokerConfig(1, brokerPort)
+    val logDirPath = config.getProperty("log.dir")
+    logDir = new File(logDirPath)
+    
+    server = TestUtils.createServer(new KafkaConfig(config))
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+  }
+
+  @After
+  def tearDown() {
+    simpleConsumer.close
+    server.shutdown
+    Utils.rm(logDir)
+  }
+
+  @Test
+  def testEmptyLogs() {
+    val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
+      new FetchRequest("test", 0, 0, 300 * 1024))
+    assertFalse(messageSet.iterator.hasNext)
+
+    {
+      val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.LatestTime, 10)
+      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+    }
+
+    {
+      val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.EarliestTime, 10)
+      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+    }
+
+    {
+      val offsets = simpleConsumer.getOffsetsBefore("test", 0, 1295978400000L, 10)
+      assertTrue( 0 == offsets.length )
+    }
+
+  }
+
+  @Test
+  def testGetOffsetsBeforeLatestTime() {
+    val topicPartition = "kafka-" + 0
+    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    Thread.sleep(100)
+
+    val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
+
+    val offsets = log.getOffsetsBefore(offsetRequest)
+    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+
+    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+                                                          OffsetRequest.LatestTime, 10)
+    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+
+    // try to fetch using latest offset
+    val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
+      new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
+    assertFalse(messageSet.iterator.hasNext)
+  }
+
+  @Test
+  def testEmptyLogsGetOffsets() {
+    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    topicLogDir = new File(topicPartitionPath)
+    topicLogDir.mkdir
+
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    var offsetChanged = false
+    for(i <- 1 to 14) {
+      val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+        OffsetRequest.EarliestTime, 1)
+
+      if(consumerOffsets(0) == 1) {
+        offsetChanged = true
+      }
+    }
+    assertFalse(offsetChanged)
+  }
+
+  @Test
+  def testGetOffsetsBeforeNow() {
+    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    val now = System.currentTimeMillis
+    Thread.sleep(100)
+
+    val offsetRequest = new OffsetRequest(topic, part, now, 10)
+    val offsets = log.getOffsetsBefore(offsetRequest)
+    assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+
+    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
+    assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+  }
+
+  @Test
+  def testGetOffsetsBeforeEarliestTime() {
+    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topic = topicPartition.split("-").head
+    val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+    val logManager = server.getLogManager
+    val log = logManager.getOrCreateLog(topic, part)
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 20)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+
+    Thread.sleep(100)
+
+    val offsetRequest = new OffsetRequest(topic, part,
+                                          OffsetRequest.EarliestTime, 10)
+    val offsets = log.getOffsetsBefore(offsetRequest)
+
+    assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+
+    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+                                                          OffsetRequest.EarliestTime, 10)
+    assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+  }
+
+  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+    val props = new Properties
+    props.put("brokerid", nodeId.toString)
+    props.put("port", port.toString)
+    props.put("log.dir", getLogDir.getAbsolutePath)
+    props.put("log.flush.interval", "1")
+    props.put("enable.zookeeper", "false")
+    props.put("num.partitions", "20")
+    props.put("log.retention.hours", "10")
+    props.put("log.cleanup.interval.mins", "5")
+    props.put("log.file.size", logSize.toString)
+    props
+  }
+
+  private def getLogDir(): File = {
+    val dir = TestUtils.tempDir()
+    dir
+  }
+
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import java.nio._
+import java.util.ArrayList
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.utils.{Utils, TestUtils, Range}
+import kafka.common.OffsetOutOfRangeException
+import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+
+class LogTest extends JUnitSuite {
+  
+  var logDir: File = null
+
+  @Before
+  def setUp() {
+    logDir = TestUtils.tempDir()
+  }
+
+  @After
+  def tearDown() {
+    Utils.rm(logDir)
+  }
+  
+  def createEmptyLogs(dir: File, offsets: Int*) = {
+    for(offset <- offsets)
+    	new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
+  }
+  
+  @Test
+  def testLoadEmptyLog() {
+    createEmptyLogs(logDir, 0)
+    new Log(logDir, 1024, 1000, false)
+  }
+  
+  @Test
+  def testLoadInvalidLogsFails() {
+    createEmptyLogs(logDir, 0, 15)
+    try {
+      new Log(logDir, 1024, 1000, false)
+      fail("Allowed load of corrupt logs without complaint.")
+    } catch {
+      case e: IllegalStateException => "This is good"
+    }
+  }
+  
+  @Test
+  def testAppendAndRead() {
+    val log = new Log(logDir, 1024, 1000, false)
+    val message = new Message(Integer.toString(42).getBytes())
+    for(i <- 0 until 10)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+    log.flush()
+    val messages = log.read(0, 1024)
+    var current = 0
+    for(curr <- messages) {
+      assertEquals("Read message should equal written", message, curr.message)
+      current += 1
+    }
+    assertEquals(10, current)
+  }
+  
+  @Test
+  def testReadOutOfRange() {
+    createEmptyLogs(logDir, 1024)
+    val log = new Log(logDir, 1024, 1000, false)
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
+    try {
+      log.read(0, 1024)
+      fail("Expected exception on invalid read.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+    try {
+      log.read(1025, 1000)
+      fail("Expected exception on invalid read.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+  }
+  
+  /** Test that writing and reading beyond the log size boundary works */
+  @Test
+  def testLogRolls() {
+    /* create a multipart log with 100 messages */
+    val log = new Log(logDir, 100, 1000, false)
+    val numMessages = 100
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+    log.flush
+    
+    /* now do successive reads and iterate over the resulting message sets counting the messages
+     * we should find exact 100 messages.
+     */
+    var reads = 0
+    var current = 0
+    var offset = 0L
+    var readOffset = 0L
+    while(current < numMessages) {
+      val messages = log.read(readOffset, 1024*1024)
+      readOffset += messages.last.offset
+      current += messages.size
+      if(reads > 2*numMessages)
+        fail("Too many read attempts.")
+      reads += 1
+    }
+    assertEquals("We did not find all the messages we put in", numMessages, current)
+  }
+  
+  @Test
+  def testFindSegment() {
+    assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
+    assertEquals("Search in segment list just outside the range of the last segment should find nothing",
+                 None, Log.findRange(makeRanges(5, 9, 12), 12))
+    try {
+      Log.findRange(makeRanges(35), 36)
+      fail("expect exception")
+    }
+    catch {
+      case e: OffsetOutOfRangeException => "this is good"
+    }
+
+    try {
+      Log.findRange(makeRanges(35,35), 36)
+    }
+    catch {
+      case e: OffsetOutOfRangeException => "this is good"
+    }
+
+    assertContains(makeRanges(5, 9, 12), 11)
+    assertContains(makeRanges(5), 4)
+    assertContains(makeRanges(5,8), 5)
+    assertContains(makeRanges(5,8), 6)
+  }
+  
+  /** Test corner cases of rolling logs */
+  @Test
+  def testEdgeLogRolls() {
+    {
+      // first test a log segment starting at 0
+      val log = new Log(logDir, 100, 1000, false)
+      val curOffset = log.nextAppendOffset
+      assertEquals(curOffset, 0)
+
+      // time goes by; the log file is deleted
+      log.markDeletedWhile(_ => true)
+
+      // we now have a new log; the starting offset of the new log should remain 0
+      assertEquals(curOffset, log.nextAppendOffset)
+    }
+
+    {
+      // second test an empty log segment starting at none-zero
+      val log = new Log(logDir, 100, 1000, false)
+      val numMessages = 1
+      for(i <- 0 until numMessages)
+        log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+
+      val curOffset = log.nextAppendOffset
+      // time goes by; the log file is deleted
+      log.markDeletedWhile(_ => true)
+
+      // we now have a new log
+      assertEquals(curOffset, log.nextAppendOffset)
+
+      // time goes by; the log file (which is empty) is deleted again
+      log.markDeletedWhile(_ => true)
+
+      // we now have a new log
+      assertEquals(curOffset, log.nextAppendOffset)
+    }
+  }
+
+  def assertContains(ranges: Array[Range], offset: Long) = {
+    Log.findRange(ranges, offset) match {
+      case Some(range) => 
+        assertTrue(range + " does not contain " + offset, range.contains(offset))
+      case None => fail("No range found, but expected to find " + offset)
+    }
+  }
+  
+  class SimpleRange(val start: Long, val size: Long) extends Range
+  
+  def makeRanges(breaks: Int*): Array[Range] = {
+    val list = new ArrayList[Range]
+    var prior = 0
+    for(brk <- breaks) {
+      list.add(new SimpleRange(prior, brk - prior))
+      prior = brk
+    }
+    list.toArray(new Array[Range](list.size))
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class SegmentListTest extends JUnitSuite {
+
+  @Test
+  def testAppend() {
+    val list = List(1, 2, 3, 4)
+    val sl = new SegmentList(list)
+    val view = sl.view
+    assertEquals(list, view.iterator.toList)
+    sl.append(5)
+    assertEquals("Appending to both should result in list that are still equals", 
+                 list ::: List(5), sl.view.iterator.toList)
+    assertEquals("But the prior view should still equal the original list", list, view.iterator.toList)
+  }
+  
+  @Test
+  def testTrunc() {
+    val hd = List(1,2,3)
+    val tail = List(4,5,6)
+    val sl = new SegmentList(hd ::: tail)
+    val view = sl.view
+    assertEquals(hd ::: tail, view.iterator.toList)
+    val deleted = sl.trunc(3)
+    assertEquals(tail, sl.view.iterator.toList)
+    assertEquals(hd, deleted.iterator.toList)
+    assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+  }
+  
+  @Test
+  def testTruncBeyondList() {
+    val sl = new SegmentList(List(1, 2))
+    sl.trunc(3)
+    assertEquals(0, sl.view.length)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.log4j
+
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{PropertyConfigurator, Logger}
+import java.util.Properties
+import java.io.File
+import kafka.consumer.SimpleConsumer
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils
+import kafka.utils.Utils
+import junit.framework.Assert._
+import kafka.api.FetchRequest
+import kafka.serializer.Encoder
+import kafka.message.{MessageSet, Message}
+import kafka.producer.async.MissingConfigException
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+
+class KafkaLog4jAppenderTest extends JUnitSuite {
+
+  var logDir: File = null
+  //  var topicLogDir: File = null
+  var server: KafkaServer = null
+  val brokerPort: Int = 9092
+  var simpleConsumer: SimpleConsumer = null
+  val tLogger = Logger.getLogger(getClass())
+
+  @Before
+  def setUp() {
+    val config: Properties = createBrokerConfig(1, brokerPort)
+    val logDirPath = config.getProperty("log.dir")
+    logDir = new File(logDirPath)
+
+    server = TestUtils.createServer(new KafkaConfig(config))
+    Thread.sleep(100)
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+  }
+
+  @After
+  def tearDown() {
+    simpleConsumer.close
+    server.shutdown
+    Thread.sleep(100)
+    Utils.rm(logDir)
+  }
+
+  @Test
+  def testKafkaLog4jConfigs() {
+    var props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.appender.KAFKA.encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+    // port missing
+    try {
+      PropertyConfigurator.configure(props)
+      fail("Missing properties exception was expected !")
+    }catch {
+      case e: MissingConfigException =>
+    }
+
+    props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+    // host missing
+    try {
+      PropertyConfigurator.configure(props)
+      fail("Missing properties exception was expected !")
+    }catch {
+      case e: MissingConfigException =>
+    }
+
+    props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+    // topic missing
+    try {
+      PropertyConfigurator.configure(props)
+      fail("Missing properties exception was expected !")
+    }catch {
+      case e: MissingConfigException =>
+    }
+
+    props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+    // serializer missing
+    try {
+      PropertyConfigurator.configure(props)
+    }catch {
+      case e: MissingConfigException => fail("should default to kafka.producer.DefaultStringEncoder")
+    }
+  }
+
+  @Test
+  def testLog4jAppends() {
+    PropertyConfigurator.configure(getLog4jConfig)
+    val logger = Logger.getLogger(classOf[KafkaLog4jAppenderTest])
+
+    for(i <- 1 to 5)
+      logger.info("test")
+
+    Thread.sleep(500)
+
+    var offset = 0L
+    val messages = simpleConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
+
+    var count = 0
+    for(message <- messages) {
+      count = count + 1
+      offset += message.offset
+    }
+
+    assertEquals(5, count)
+  }
+
+
+  private def getLog4jConfig: Properties = {
+    var props = new Properties()
+    props.put("log4j.rootLogger", "INFO")
+    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+    props.put("log4j.appender.KAFKA.Port", "9092")
+    props.put("log4j.appender.KAFKA.Host", "localhost")
+    props.put("log4j.appender.KAFKA.Topic", "test-topic")
+    props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+    props
+  }
+
+  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+    val props = new Properties
+    props.put("brokerid", nodeId.toString)
+    props.put("port", port.toString)
+    props.put("log.dir", getLogDir.getAbsolutePath)
+    props.put("log.flush.interval", "1")
+    props.put("enable.zookeeper", "false")
+    props.put("num.partitions", "1")
+    props.put("log.retention.hours", "10")
+    props.put("log.cleanup.interval.mins", "5")
+    props.put("log.file.size", "1000")
+    props
+  }
+
+  private def getLogDir(): File = {
+    val dir = TestUtils.tempDir()
+    dir
+  }
+}
+
+class AppenderStringEncoder extends Encoder[LoggingEvent] {
+  def toMessage(event: LoggingEvent):Message = {
+    val logMessage = event.getMessage
+    new Message(logMessage.asInstanceOf[String].getBytes)
+  }
+}
+