You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [22/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,416 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.integration
+
+import scala.collection._
+import kafka.api.FetchRequest
+import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException}
+import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.ProducerRequest
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with kafka.integration.KafkaServerTestHarness {
+
+ val port = 9999
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props) {
+ override val enableZookeeper = false
+ }
+ val configs = List(config)
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ def testProduceAndFetch() {
+ // send some messages
+ val topic = "test"
+
+// send an empty messageset first
+ val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(Seq.empty[Message]: _*))
+ producer.send(topic, sent2)
+ Thread.sleep(200)
+ sent2.getBuffer.rewind
+ var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+
+
+ // send some messages
+ val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ producer.send(topic, sent3)
+
+ Thread.sleep(200)
+ sent3.getBuffer.rewind
+ var fetched3: ByteBufferMessageSet = null
+ while(fetched3 == null || fetched3.validBytes == 0)
+ fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ // send an invalid offset
+ try {
+ val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+ fetchedWithError.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testProduceAndFetchWithCompression() {
+ // send some messages
+ val topic = "test"
+
+// send an empty messageset first
+ val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(Seq.empty[Message]: _*))
+ producer.send(topic, sent2)
+ Thread.sleep(200)
+ sent2.getBuffer.rewind
+ var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
+
+
+ // send some messages
+ val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ producer.send(topic, sent3)
+
+ Thread.sleep(200)
+ sent3.getBuffer.rewind
+ var fetched3: ByteBufferMessageSet = null
+ while(fetched3 == null || fetched3.validBytes == 0)
+ fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent3.iterator, fetched3.iterator)
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ // send an invalid offset
+ try {
+ val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+ fetchedWithError.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testProduceAndMultiFetch() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ {
+ // send some invalid offsets
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, 0, -1, 10000)
+
+ try {
+ val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = responses.iterator
+ while (iter.hasNext)
+ iter.next.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+ }
+
+ {
+ // send some invalid partitions
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, -1, 0, 10000)
+
+ try {
+ val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = responses.iterator
+ while (iter.hasNext)
+ iter.next.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: InvalidPartitionException => "this is good"
+ }
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testProduceAndMultiFetchWithCompression() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ {
+ // send some invalid offsets
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, 0, -1, 10000)
+
+ try {
+ val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = responses.iterator
+ while (iter.hasNext)
+ iter.next.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+ }
+
+ {
+ // send some invalid partitions
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, -1, 0, 10000)
+
+ try {
+ val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = responses.iterator
+ while (iter.hasNext)
+ iter.next.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: InvalidPartitionException => "this is good"
+ }
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testProduceAndMultiFetchJava() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches.add(new FetchRequest(topic, 0, 0, 10000))
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches)
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+ }
+
+ def testProduceAndMultiFetchJavaWithCompression() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches.add(new FetchRequest(topic, 0, 0, 10000))
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches)
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+ }
+
+ def testMultiProduce() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+
+ def testMultiProduceWithCompression() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message(("a_" + topic).getBytes),
+ new Message(("b_" + topic).getBytes)))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(getFetchRequestList(fetches: _*))
+ val iter = response.iterator
+ for(topic <- topics) {
+ if (iter.hasNext) {
+ val resp = iter.next
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+ else
+ fail("fewer responses than expected")
+ }
+ }
+
+ private def getMessageList(messages: Message*): java.util.List[Message] = {
+ val messageList = new java.util.ArrayList[Message]()
+ messages.foreach(m => messageList.add(m))
+ messageList
+ }
+
+ private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = {
+ val fetchReqs = new java.util.ArrayList[FetchRequest]()
+ fetches.foreach(f => fetchReqs.add(f))
+ fetchReqs
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.integration
+
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.producer.SyncProducer
+import kafka.javaapi.consumer.SimpleConsumer
+
+trait ProducerConsumerTestHarness extends JUnit3Suite {
+
+ val port: Int
+ val host = "localhost"
+ var producer: SyncProducer = null
+ var consumer: SimpleConsumer = null
+
+ override def setUp() {
+ val props = new Properties()
+ props.put("host", host)
+ props.put("port", port.toString)
+ props.put("buffer.size", "65536")
+ props.put("connect.timeout.ms", "100000")
+ props.put("reconnect.interval", "10000")
+ producer = new SyncProducer(new SyncProducerConfig(props))
+ consumer = new SimpleConsumer(host,
+ port,
+ 1000000,
+ 64*1024)
+ super.setUp
+ }
+
+ override def tearDown() {
+ super.tearDown
+ producer.close()
+ consumer.close()
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.message
+
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+
+trait BaseMessageSetTestCases extends JUnitSuite {
+
+ val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+ def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
+ def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
+ import scala.collection.JavaConversions._
+ val messages = asIterable(messageSet)
+ messages.map(m => m.message).iterator
+ }
+
+ @Test
+ def testWrittenEqualsRead {
+ import scala.collection.JavaConversions._
+ val messageSet = createMessageSet(messages)
+ TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
+ }
+
+ @Test
+ def testIteratorIsConsistent() {
+ import scala.collection.JavaConversions._
+ val m = createMessageSet(messages)
+ // two iterators over the same set should give the same results
+ TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ }
+
+ @Test
+ def testIteratorIsConsistentWithCompression() {
+ import scala.collection.JavaConversions._
+ val m = createMessageSet(messages, DefaultCompressionCodec)
+ // two iterators over the same set should give the same results
+ TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
+ }
+
+ @Test
+ def testSizeInBytes() {
+ assertEquals("Empty message set should have 0 bytes.",
+ 0L,
+ createMessageSet(Array[Message]()).sizeInBytes)
+ assertEquals("Predicted size should equal actual size.",
+ kafka.message.MessageSet.messageSetSize(messages).toLong,
+ createMessageSet(messages).sizeInBytes)
+ }
+
+ @Test
+ def testSizeInBytesWithCompression () {
+ assertEquals("Empty message set should have 0 bytes.",
+ 30L, // overhead of the GZIP output stream
+ createMessageSet(Array[Message](), DefaultCompressionCodec).sizeInBytes)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.message
+
+import java.nio._
+import junit.framework.TestCase
+import junit.framework.Assert._
+import org.junit.Test
+import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
+
+class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases {
+
+ override def createMessageSet(messages: Seq[Message],
+ compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
+ new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
+
+ @Test
+ def testValidBytes() {
+ val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
+ buffer.put(messageList.getBuffer)
+ buffer.putShort(4)
+ val messageListPlus = new ByteBufferMessageSet(buffer)
+ assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
+ }
+
+ @Test
+ def testValidBytesWithCompression () {
+ val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
+ buffer.put(messageList.getBuffer)
+ buffer.putShort(4)
+ val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
+ assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
+ }
+
+ @Test
+ def testEquals() {
+ val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+
+ assertEquals(messageList, moreMessages)
+ assertTrue(messageList.equals(moreMessages))
+ }
+
+ @Test
+ def testEqualsWithCompression () {
+ val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+ val moreMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
+
+ assertEquals(messageList, moreMessages)
+ assertTrue(messageList.equals(moreMessages))
+ }
+
+ private def getMessageList(messages: Message*): java.util.List[Message] = {
+ val messageList = new java.util.ArrayList[Message]()
+ messages.foreach(m => messageList.add(m))
+ messageList
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,630 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.javaapi.producer
+
+import java.util.Properties
+import org.apache.log4j.{Logger, Level}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.zk.EmbeddedZookeeper
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import collection.mutable.HashMap
+import org.easymock.EasyMock
+import kafka.utils.Utils
+import java.util.concurrent.ConcurrentHashMap
+import kafka.cluster.Partition
+import kafka.common.{UnavailableProducerException, InvalidPartitionException, InvalidConfigException}
+import org.scalatest.junit.JUnitSuite
+import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
+import kafka.producer.ProducerPool
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.javaapi.Implicits._
+import kafka.serializer.{StringEncoder, Encoder}
+import kafka.javaapi.consumer.SimpleConsumer
+import kafka.api.FetchRequest
+import kafka.message.{NoCompressionCodec, Message}
+
+class ProducerTest extends JUnitSuite {
+ private val topic = "test-topic"
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val port1 = 9092
+ private val port2 = 9093
+ private var server1: KafkaServer = null
+ private var server2: KafkaServer = null
+ private var producer1: SyncProducer = null
+ private var producer2: SyncProducer = null
+ private var consumer1: SimpleConsumer = null
+ private var consumer2: SimpleConsumer = null
+ private var zkServer:EmbeddedZookeeper = null
+ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ @Before
+ def setUp() {
+ // set up 2 brokers with 4 partitions each
+ zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+
+ val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ val config1 = new KafkaConfig(props1) {
+ override val numPartitions = 4
+ }
+ server1 = TestUtils.createServer(config1)
+
+ val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ val config2 = new KafkaConfig(props2) {
+ override val numPartitions = 4
+ }
+ server2 = TestUtils.createServer(config2)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", port1.toString)
+
+ producer1 = new SyncProducer(new SyncProducerConfig(props))
+ val messages1 = new java.util.ArrayList[Message]
+ messages1.add(new Message("test".getBytes()))
+ producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1))
+
+ producer2 = new SyncProducer(new SyncProducerConfig(props) {
+ override val port = port2
+ })
+ val messages2 = new java.util.ArrayList[Message]
+ messages2.add(new Message("test".getBytes()))
+
+ producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages2))
+
+ consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
+ consumer2 = new SimpleConsumer("localhost", port2, 1000000, 64*1024)
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ Thread.sleep(500)
+ }
+
+ @After
+ def tearDown() {
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ server1.shutdown
+ server2.shutdown
+ Utils.rm(server1.config.logDir)
+ Utils.rm(server2.config.logDir)
+ Thread.sleep(500)
+ zkServer.shutdown
+ }
+
+ @Test
+ def testSend() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ val messageList = new java.util.ArrayList[Message]
+ messageList.add(new Message("test1".getBytes()))
+ syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+ new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ val messagesContent = new java.util.ArrayList[String]
+ messagesContent.add("test1")
+ producer.send(new ProducerData[String, String](topic, "test", messagesContent))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testSendSingleMessage() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9092")
+
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ // it should send to random partition on broker 1
+ val messageList = new java.util.ArrayList[Message]
+ messageList.add(new Message("t".getBytes()))
+ syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+
+ syncProducers.put(brokerId1, syncProducer1)
+
+ val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+ new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ producer.send(new ProducerData[String, String](topic, "t"))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ }
+
+ @Test
+ def testInvalidPartition() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ val config = new ProducerConfig(props)
+
+ val richProducer = new Producer[String, String](config)
+ val messagesContent = new java.util.ArrayList[String]
+ messagesContent.add("test")
+ try {
+ richProducer.send(new ProducerData[String, String](topic, "test", messagesContent))
+ Assert.fail("Should fail with InvalidPartitionException")
+ }catch {
+ case e: InvalidPartitionException => // expected, do nothing
+ }
+ }
+
+ @Test
+ def testSyncProducerPool() {
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val messageList = new java.util.ArrayList[Message]
+ messageList.add(new Message("test1".getBytes()))
+ syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ // default for producer.type is "sync"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+
+ producerPool.close
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testAsyncProducerPool() {
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ asyncProducer1.send(topic, "test1", 0)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ asyncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+ EasyMock.replay(asyncProducer2)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+ asyncProducers.put(brokerId2, asyncProducer2)
+
+ // change producer.type to "async"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
+ producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+
+ producerPool.close
+ EasyMock.verify(asyncProducer1)
+ EasyMock.verify(asyncProducer2)
+ }
+
+ @Test
+ def testSyncUnavailableProducerException() {
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId2, syncProducer2)
+
+ // default for producer.type is "sync"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ try {
+ producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ Assert.fail("Should fail with UnavailableProducerException")
+ }catch {
+ case e: UnavailableProducerException => // expected
+ }
+
+ producerPool.close
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testAsyncUnavailableProducerException() {
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ asyncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+ EasyMock.replay(asyncProducer2)
+
+ asyncProducers.put(brokerId2, asyncProducer2)
+
+ // change producer.type to "async"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
+ try {
+ producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ Assert.fail("Should fail with UnavailableProducerException")
+ }catch {
+ case e: UnavailableProducerException => // expected
+ }
+
+ producerPool.close
+ EasyMock.verify(asyncProducer1)
+ EasyMock.verify(asyncProducer2)
+ }
+
+ @Test
+ def testConfigBrokerPartitionInfoWithPartitioner {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
+ brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
+
+ var config: ProducerConfig = null
+ try {
+ config = new ProducerConfig(props)
+ fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
+ }catch {
+ case e: InvalidConfigException => // expected
+ }
+ }
+
+ @Test
+ def testConfigBrokerPartitionInfo() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ asyncProducer1.send(topic, "test1", -1)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+
+ val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](),
+ asyncProducers)
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ val messagesContent = new java.util.ArrayList[String]
+ messagesContent.add("test1")
+ producer.send(new ProducerData[String, String](topic, "test1", messagesContent))
+ producer.close
+
+ EasyMock.verify(asyncProducer1)
+ }
+
+ @Test
+ def testZKSendToNewTopic() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val serializer = new StringEncoder
+
+ val producer = new Producer[String, String](config)
+ try {
+ import scala.collection.JavaConversions._
+ producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+ Thread.sleep(100)
+ producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+ } catch {
+ case e: Exception => fail("Not expected")
+ }
+ producer.close
+ }
+
+ @Test
+ def testZKSendWithDeadBroker() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val serializer = new StringEncoder
+
+ val producer = new Producer[String, String](config)
+ try {
+ import scala.collection.JavaConversions._
+ producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+ Thread.sleep(100)
+ // kill 2nd broker
+ server2.shutdown
+ Thread.sleep(100)
+ producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1"))))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ } catch {
+ case e: Exception => fail("Not expected")
+ }
+ producer.close
+ }
+
+ @Test
+ def testPartitionedSendToNewTopic() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringEncoder
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ import scala.collection.JavaConversions._
+ syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = asList(Array(new Message("test1".getBytes)))))
+ EasyMock.expectLastCall
+ syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = asList(Array(new Message("test1".getBytes)))))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ producer.send(new ProducerData[String, String]("test-topic1", "test", asList(Array("test1"))))
+ Thread.sleep(100)
+
+ // now send again to this topic using a real producer, this time all brokers would have registered
+ // their partitions in zookeeper
+ producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = asList(Array(new Message("test".getBytes())))))
+ Thread.sleep(100)
+
+ // wait for zookeeper to register the new topic
+ producer.send(new ProducerData[String, String]("test-topic1", "test1", asList(Array("test1"))))
+ Thread.sleep(100)
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testPartitionedSendToNewBrokerInExistingTopic() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val syncProducer3 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ val messages1 = new java.util.ArrayList[Message]
+ messages1.add(new Message("test1".getBytes()))
+ syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ syncProducer3.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+ EasyMock.replay(syncProducer3)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+ syncProducers.put(2, syncProducer3)
+
+ val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ val serverProps = TestUtils.createBrokerConfig(2, 9094)
+ val serverConfig = new KafkaConfig(serverProps) {
+ override val numPartitions = 4
+ }
+ val server3 = TestUtils.createServer(serverConfig)
+
+ // send a message to the new broker to register it under topic "test-topic"
+ val tempProps = new Properties()
+ tempProps.put("host", "localhost")
+ tempProps.put("port", "9094")
+ val tempProducer = new kafka.producer.SyncProducer(new SyncProducerConfig(tempProps))
+ val messageList = new java.util.ArrayList[Message]
+ messageList.add(new Message("test".getBytes()))
+ tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+
+ Thread.sleep(500)
+
+ val messagesContent = new java.util.ArrayList[String]
+ messagesContent.add("test1")
+ producer.send(new ProducerData[String, String]("test-topic", "test-topic", messagesContent))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ EasyMock.verify(syncProducer3)
+
+ server3.shutdown
+ Utils.rm(server3.config.logDir)
+ }
+
+ @Test
+ def testDefaultPartitioner() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+ val config = new ProducerConfig(props)
+ val partitioner = new DefaultPartitioner[String]
+ val serializer = new StringSerializer
+
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ asyncProducer1.send(topic, "test1", -1)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+
+ val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](),
+ asyncProducers)
+ val producer = new Producer[String, String](config, partitioner, producerPool, false)
+
+ val messagesContent = new java.util.ArrayList[String]
+ messagesContent.add("test1")
+ producer.send(new ProducerData[String, String](topic, "test", messagesContent))
+ producer.close
+
+ EasyMock.verify(asyncProducer1)
+ }
+}
+
+class StringSerializer extends Encoder[String] {
+ def toEvent(message: Message):String = message.toString
+ def toMessage(event: String):Message = new Message(event.getBytes)
+ def getTopic(event: String): String = event.concat("-topic")
+}
+
+class NegativePartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ -1
+ }
+}
+
+class StaticPartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ (data.length % numPartitions)
+ }
+}
+
+class HashPartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ (data.hashCode % numPartitions)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.producer
+
+import junit.framework.{Assert, TestCase}
+import kafka.utils.SystemTime
+import kafka.utils.TestUtils
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.apache.log4j.{Logger, Level}
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import java.util.Properties
+import kafka.producer.SyncProducerConfig
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.javaapi.ProducerRequest
+import kafka.message.{NoCompressionCodec, Message}
+
+class SyncProducerTest extends JUnitSuite {
+ private var messageBytes = new Array[Byte](2);
+ private var server: KafkaServer = null
+ val simpleProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
+
+ @Before
+ def setUp() {
+ server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
+ {
+ override val enableZookeeper = false
+ })
+ }
+
+ @After
+ def tearDown() {
+ server.shutdown
+ }
+
+ @Test
+ def testUnreachableServer() {
+ val props = new Properties()
+ props.put("host", "NOT_USED")
+ props.put("port", "9092")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "1000")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+
+ //temporarily increase log4j level to avoid error in output
+ simpleProducerLogger.setLevel(Level.FATAL)
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ println("First message send retries took " + (firstEnd-firstStart) + " ms")
+ Assert.assertTrue((firstEnd-firstStart) < 300)
+
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed = true
+
+ }
+ val secondEnd = SystemTime.milliseconds
+ println("Second message send retries took " + (secondEnd-secondStart) + " ms")
+ Assert.assertTrue((secondEnd-secondEnd) < 300)
+ simpleProducerLogger.setLevel(Level.ERROR)
+ }
+
+ @Test
+ def testReachableServer() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "500")
+ props.put("reconnect.interval", "1000")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed=true
+ }
+ Assert.assertFalse(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ Assert.assertTrue((firstEnd-firstStart) < 500)
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertFalse(failed)
+ val secondEnd = SystemTime.milliseconds
+ Assert.assertTrue((secondEnd-secondEnd) < 500)
+
+ try {
+ producer.multiSend(Array(new ProducerRequest("test", 0,
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))))
+ }catch {
+ case e: Exception => failed=true
+ }
+ Assert.assertFalse(failed)
+ }
+
+ @Test
+ def testReachableServerWrongPort() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9091")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+ //temporarily increase log4j level to avoid error in output
+ simpleProducerLogger.setLevel(Level.FATAL)
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ Assert.assertTrue((firstEnd-firstStart) < 300)
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = getMessageList(new Message(messageBytes))))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ val secondEnd = SystemTime.milliseconds
+ Assert.assertTrue((secondEnd-secondEnd) < 300)
+ simpleProducerLogger.setLevel(Level.ERROR)
+ }
+
+ private def getMessageList(message: Message): java.util.List[Message] = {
+ val messageList = new java.util.ArrayList[Message]()
+ messageList.add(message)
+ messageList
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import junit.framework.Assert._
+import kafka.server.KafkaConfig
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.utils.{Utils, MockTime, TestUtils}
+import kafka.common.OffsetOutOfRangeException
+
+class LogManagerTest extends JUnitSuite {
+
+ val time: MockTime = new MockTime()
+ val maxLogAge = 1000
+ var logDir: File = null
+ var logManager: LogManager = null
+ var config:KafkaConfig = null
+
+ @Before
+ def setUp() {
+ val props = TestUtils.createBrokerConfig(0, -1)
+ config = new KafkaConfig(props) {
+ override val logFileSize = 1024
+ override val enableZookeeper = false
+ }
+ logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager.startup
+ logDir = logManager.logDir
+ }
+
+ @After
+ def tearDown() {
+ logManager.close()
+ Utils.rm(logDir)
+ }
+
+ @Test
+ def testCreateLog() {
+ val log = logManager.getOrCreateLog("kafka", 0)
+ log.append(TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+
+ @Test
+ def testCleanup() {
+ val log = logManager.getOrCreateLog("cleanup", 0)
+ var offset = 0L
+ for(i <- 0 until 1000) {
+ var set = TestUtils.singleMessageSet("test".getBytes())
+ log.append(set)
+ offset += set.sizeInBytes
+ }
+ log.flush
+ // Why this sleep is required ? File system takes some time to update the last modified time for a file.
+ // TODO: What is unknown is why 1 second or couple 100 milliseconds didn't work ?
+ Thread.sleep(2000)
+ assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+ time.currentMs += maxLogAge + 3000
+ logManager.cleanupLogs()
+ assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
+ assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+ try {
+ log.read(0, 1024)
+ fail("Should get exception from fetching earlier.")
+ } catch {
+ case e: OffsetOutOfRangeException => "This is good."
+ }
+ // log should still be appendable
+ log.append(TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+ @Test
+ def testTimeBasedFlush() {
+ val props = TestUtils.createBrokerConfig(0, -1)
+ logManager.close
+ Thread.sleep(100)
+ config = new KafkaConfig(props) {
+ override val logFileSize = 1024 *1024 *1024
+ override val enableZookeeper = false
+ override val flushSchedulerThreadRate = 50
+ override val flushInterval = Int.MaxValue
+ override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
+ }
+ logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager.startup
+ val log = logManager.getOrCreateLog("timebasedflush", 0)
+ for(i <- 0 until 200) {
+ var set = TestUtils.singleMessageSet("test".getBytes())
+ log.append(set)
+ }
+
+ assertTrue("The last flush time has to be within defaultflushInterval of current time ",
+ (System.currentTimeMillis - log.getLastFlushedTime) < 100)
+ }
+
+ @Test
+ def testConfigurablePartitions() {
+ val props = TestUtils.createBrokerConfig(0, -1)
+ logManager.close
+ Thread.sleep(100)
+ config = new KafkaConfig(props) {
+ override val logFileSize = 256
+ override val enableZookeeper = false
+ override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
+ }
+
+ logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+ logManager.startup
+
+ for(i <- 0 until 2) {
+ val log = logManager.getOrCreateLog("testPartition", i)
+ for(i <- 0 until 250) {
+ var set = TestUtils.singleMessageSet("test".getBytes())
+ log.append(set)
+ }
+ }
+
+ try
+ {
+ val log = logManager.getOrCreateLog("testPartition", 2)
+ assertTrue("Should not come here", log != null)
+ } catch {
+ case _ =>
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.TestCase
+import java.io.File
+import kafka.utils.TestUtils
+import kafka.utils.Utils
+import kafka.server.{KafkaConfig, KafkaServer}
+import junit.framework.Assert._
+import java.util.{Random, Properties}
+import kafka.api.{FetchRequest, OffsetRequest}
+import collection.mutable.WrappedArray
+import kafka.consumer.SimpleConsumer
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+object LogOffsetTest {
+ val random = new Random()
+}
+
+class LogOffsetTest extends JUnitSuite {
+ var logDir: File = null
+ var topicLogDir: File = null
+ var server: KafkaServer = null
+ var logSize: Int = 100
+ val brokerPort: Int = 9099
+ var simpleConsumer: SimpleConsumer = null
+
+ @Before
+ def setUp() {
+ val config: Properties = createBrokerConfig(1, brokerPort)
+ val logDirPath = config.getProperty("log.dir")
+ logDir = new File(logDirPath)
+
+ server = TestUtils.createServer(new KafkaConfig(config))
+ simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+ }
+
+ @After
+ def tearDown() {
+ simpleConsumer.close
+ server.shutdown
+ Utils.rm(logDir)
+ }
+
+ @Test
+ def testEmptyLogs() {
+ val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
+ new FetchRequest("test", 0, 0, 300 * 1024))
+ assertFalse(messageSet.iterator.hasNext)
+
+ {
+ val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.LatestTime, 10)
+ assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ }
+
+ {
+ val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.EarliestTime, 10)
+ assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ }
+
+ {
+ val offsets = simpleConsumer.getOffsetsBefore("test", 0, 1295978400000L, 10)
+ assertTrue( 0 == offsets.length )
+ }
+
+ }
+
+ @Test
+ def testGetOffsetsBeforeLatestTime() {
+ val topicPartition = "kafka-" + 0
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ Thread.sleep(100)
+
+ val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
+
+ val offsets = log.getOffsetsBefore(offsetRequest)
+ assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+
+ val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+ OffsetRequest.LatestTime, 10)
+ assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+
+ // try to fetch using latest offset
+ val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
+ new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
+ assertFalse(messageSet.iterator.hasNext)
+ }
+
+ @Test
+ def testEmptyLogsGetOffsets() {
+ val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ topicLogDir = new File(topicPartitionPath)
+ topicLogDir.mkdir
+
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ var offsetChanged = false
+ for(i <- 1 to 14) {
+ val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+ OffsetRequest.EarliestTime, 1)
+
+ if(consumerOffsets(0) == 1) {
+ offsetChanged = true
+ }
+ }
+ assertFalse(offsetChanged)
+ }
+
+ @Test
+ def testGetOffsetsBeforeNow() {
+ val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val now = System.currentTimeMillis
+ Thread.sleep(100)
+
+ val offsetRequest = new OffsetRequest(topic, part, now, 10)
+ val offsets = log.getOffsetsBefore(offsetRequest)
+ assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+
+ val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
+ assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+ }
+
+ @Test
+ def testGetOffsetsBeforeEarliestTime() {
+ val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ Thread.sleep(100)
+
+ val offsetRequest = new OffsetRequest(topic, part,
+ OffsetRequest.EarliestTime, 10)
+ val offsets = log.getOffsetsBefore(offsetRequest)
+
+ assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+
+ val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
+ OffsetRequest.EarliestTime, 10)
+ assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ }
+
+ private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ val props = new Properties
+ props.put("brokerid", nodeId.toString)
+ props.put("port", port.toString)
+ props.put("log.dir", getLogDir.getAbsolutePath)
+ props.put("log.flush.interval", "1")
+ props.put("enable.zookeeper", "false")
+ props.put("num.partitions", "20")
+ props.put("log.retention.hours", "10")
+ props.put("log.cleanup.interval.mins", "5")
+ props.put("log.file.size", logSize.toString)
+ props
+ }
+
+ private def getLogDir(): File = {
+ val dir = TestUtils.tempDir()
+ dir
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import java.nio._
+import java.util.ArrayList
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.utils.{Utils, TestUtils, Range}
+import kafka.common.OffsetOutOfRangeException
+import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+
+class LogTest extends JUnitSuite {
+
+ var logDir: File = null
+
+ @Before
+ def setUp() {
+ logDir = TestUtils.tempDir()
+ }
+
+ @After
+ def tearDown() {
+ Utils.rm(logDir)
+ }
+
+ def createEmptyLogs(dir: File, offsets: Int*) = {
+ for(offset <- offsets)
+ new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
+ }
+
+ @Test
+ def testLoadEmptyLog() {
+ createEmptyLogs(logDir, 0)
+ new Log(logDir, 1024, 1000, false)
+ }
+
+ @Test
+ def testLoadInvalidLogsFails() {
+ createEmptyLogs(logDir, 0, 15)
+ try {
+ new Log(logDir, 1024, 1000, false)
+ fail("Allowed load of corrupt logs without complaint.")
+ } catch {
+ case e: IllegalStateException => "This is good"
+ }
+ }
+
+ @Test
+ def testAppendAndRead() {
+ val log = new Log(logDir, 1024, 1000, false)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 10)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+ val messages = log.read(0, 1024)
+ var current = 0
+ for(curr <- messages) {
+ assertEquals("Read message should equal written", message, curr.message)
+ current += 1
+ }
+ assertEquals(10, current)
+ }
+
+ @Test
+ def testReadOutOfRange() {
+ createEmptyLogs(logDir, 1024)
+ val log = new Log(logDir, 1024, 1000, false)
+ assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
+ try {
+ log.read(0, 1024)
+ fail("Expected exception on invalid read.")
+ } catch {
+ case e: OffsetOutOfRangeException => "This is good."
+ }
+ try {
+ log.read(1025, 1000)
+ fail("Expected exception on invalid read.")
+ } catch {
+ case e: OffsetOutOfRangeException => "This is good."
+ }
+ }
+
+ /** Test that writing and reading beyond the log size boundary works */
+ @Test
+ def testLogRolls() {
+ /* create a multipart log with 100 messages */
+ val log = new Log(logDir, 100, 1000, false)
+ val numMessages = 100
+ for(i <- 0 until numMessages)
+ log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+ log.flush
+
+ /* now do successive reads and iterate over the resulting message sets counting the messages
+ * we should find exact 100 messages.
+ */
+ var reads = 0
+ var current = 0
+ var offset = 0L
+ var readOffset = 0L
+ while(current < numMessages) {
+ val messages = log.read(readOffset, 1024*1024)
+ readOffset += messages.last.offset
+ current += messages.size
+ if(reads > 2*numMessages)
+ fail("Too many read attempts.")
+ reads += 1
+ }
+ assertEquals("We did not find all the messages we put in", numMessages, current)
+ }
+
+ @Test
+ def testFindSegment() {
+ assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
+ assertEquals("Search in segment list just outside the range of the last segment should find nothing",
+ None, Log.findRange(makeRanges(5, 9, 12), 12))
+ try {
+ Log.findRange(makeRanges(35), 36)
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+
+ try {
+ Log.findRange(makeRanges(35,35), 36)
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+
+ assertContains(makeRanges(5, 9, 12), 11)
+ assertContains(makeRanges(5), 4)
+ assertContains(makeRanges(5,8), 5)
+ assertContains(makeRanges(5,8), 6)
+ }
+
+ /** Test corner cases of rolling logs */
+ @Test
+ def testEdgeLogRolls() {
+ {
+ // first test a log segment starting at 0
+ val log = new Log(logDir, 100, 1000, false)
+ val curOffset = log.nextAppendOffset
+ assertEquals(curOffset, 0)
+
+ // time goes by; the log file is deleted
+ log.markDeletedWhile(_ => true)
+
+ // we now have a new log; the starting offset of the new log should remain 0
+ assertEquals(curOffset, log.nextAppendOffset)
+ }
+
+ {
+ // second test an empty log segment starting at none-zero
+ val log = new Log(logDir, 100, 1000, false)
+ val numMessages = 1
+ for(i <- 0 until numMessages)
+ log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+
+ val curOffset = log.nextAppendOffset
+ // time goes by; the log file is deleted
+ log.markDeletedWhile(_ => true)
+
+ // we now have a new log
+ assertEquals(curOffset, log.nextAppendOffset)
+
+ // time goes by; the log file (which is empty) is deleted again
+ log.markDeletedWhile(_ => true)
+
+ // we now have a new log
+ assertEquals(curOffset, log.nextAppendOffset)
+ }
+ }
+
+ def assertContains(ranges: Array[Range], offset: Long) = {
+ Log.findRange(ranges, offset) match {
+ case Some(range) =>
+ assertTrue(range + " does not contain " + offset, range.contains(offset))
+ case None => fail("No range found, but expected to find " + offset)
+ }
+ }
+
+ class SimpleRange(val start: Long, val size: Long) extends Range
+
+ def makeRanges(breaks: Int*): Array[Range] = {
+ val list = new ArrayList[Range]
+ var prior = 0
+ for(brk <- breaks) {
+ list.add(new SimpleRange(prior, brk - prior))
+ prior = brk
+ }
+ list.toArray(new Array[Range](list.size))
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class SegmentListTest extends JUnitSuite {
+
+ @Test
+ def testAppend() {
+ val list = List(1, 2, 3, 4)
+ val sl = new SegmentList(list)
+ val view = sl.view
+ assertEquals(list, view.iterator.toList)
+ sl.append(5)
+ assertEquals("Appending to both should result in list that are still equals",
+ list ::: List(5), sl.view.iterator.toList)
+ assertEquals("But the prior view should still equal the original list", list, view.iterator.toList)
+ }
+
+ @Test
+ def testTrunc() {
+ val hd = List(1,2,3)
+ val tail = List(4,5,6)
+ val sl = new SegmentList(hd ::: tail)
+ val view = sl.view
+ assertEquals(hd ::: tail, view.iterator.toList)
+ val deleted = sl.trunc(3)
+ assertEquals(tail, sl.view.iterator.toList)
+ assertEquals(hd, deleted.iterator.toList)
+ assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+ }
+
+ @Test
+ def testTruncBeyondList() {
+ val sl = new SegmentList(List(1, 2))
+ sl.trunc(3)
+ assertEquals(0, sl.view.length)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.log4j
+
+import org.apache.log4j.spi.LoggingEvent
+import org.apache.log4j.{PropertyConfigurator, Logger}
+import java.util.Properties
+import java.io.File
+import kafka.consumer.SimpleConsumer
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils
+import kafka.utils.Utils
+import junit.framework.Assert._
+import kafka.api.FetchRequest
+import kafka.serializer.Encoder
+import kafka.message.{MessageSet, Message}
+import kafka.producer.async.MissingConfigException
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+
+class KafkaLog4jAppenderTest extends JUnitSuite {
+
+ var logDir: File = null
+ // var topicLogDir: File = null
+ var server: KafkaServer = null
+ val brokerPort: Int = 9092
+ var simpleConsumer: SimpleConsumer = null
+ val tLogger = Logger.getLogger(getClass())
+
+ @Before
+ def setUp() {
+ val config: Properties = createBrokerConfig(1, brokerPort)
+ val logDirPath = config.getProperty("log.dir")
+ logDir = new File(logDirPath)
+
+ server = TestUtils.createServer(new KafkaConfig(config))
+ Thread.sleep(100)
+ simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+ }
+
+ @After
+ def tearDown() {
+ simpleConsumer.close
+ server.shutdown
+ Thread.sleep(100)
+ Utils.rm(logDir)
+ }
+
+ @Test
+ def testKafkaLog4jConfigs() {
+ var props = new Properties()
+ props.put("log4j.rootLogger", "INFO")
+ props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.Host", "localhost")
+ props.put("log4j.appender.KAFKA.Topic", "test-topic")
+ props.put("log4j.appender.KAFKA.encoder", "kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+ // port missing
+ try {
+ PropertyConfigurator.configure(props)
+ fail("Missing properties exception was expected !")
+ }catch {
+ case e: MissingConfigException =>
+ }
+
+ props = new Properties()
+ props.put("log4j.rootLogger", "INFO")
+ props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.Topic", "test-topic")
+ props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.appender.KAFKA.Port", "9092")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+ // host missing
+ try {
+ PropertyConfigurator.configure(props)
+ fail("Missing properties exception was expected !")
+ }catch {
+ case e: MissingConfigException =>
+ }
+
+ props = new Properties()
+ props.put("log4j.rootLogger", "INFO")
+ props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.Host", "localhost")
+ props.put("log4j.appender.KAFKA.Port", "9092")
+ props.put("log4j.appender.KAFKA.Encoder", "kafka.log4j.AppenderStringEncoder")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+ // topic missing
+ try {
+ PropertyConfigurator.configure(props)
+ fail("Missing properties exception was expected !")
+ }catch {
+ case e: MissingConfigException =>
+ }
+
+ props = new Properties()
+ props.put("log4j.rootLogger", "INFO")
+ props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.Host", "localhost")
+ props.put("log4j.appender.KAFKA.Topic", "test-topic")
+ props.put("log4j.appender.KAFKA.Port", "9092")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+
+ // serializer missing
+ try {
+ PropertyConfigurator.configure(props)
+ }catch {
+ case e: MissingConfigException => fail("should default to kafka.producer.DefaultStringEncoder")
+ }
+ }
+
+ @Test
+ def testLog4jAppends() {
+ PropertyConfigurator.configure(getLog4jConfig)
+ val logger = Logger.getLogger(classOf[KafkaLog4jAppenderTest])
+
+ for(i <- 1 to 5)
+ logger.info("test")
+
+ Thread.sleep(500)
+
+ var offset = 0L
+ val messages = simpleConsumer.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024))
+
+ var count = 0
+ for(message <- messages) {
+ count = count + 1
+ offset += message.offset
+ }
+
+ assertEquals(5, count)
+ }
+
+
+ private def getLog4jConfig: Properties = {
+ var props = new Properties()
+ props.put("log4j.rootLogger", "INFO")
+ props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
+ props.put("log4j.appender.KAFKA.Port", "9092")
+ props.put("log4j.appender.KAFKA.Host", "localhost")
+ props.put("log4j.appender.KAFKA.Topic", "test-topic")
+ props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
+ props
+ }
+
+ private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ val props = new Properties
+ props.put("brokerid", nodeId.toString)
+ props.put("port", port.toString)
+ props.put("log.dir", getLogDir.getAbsolutePath)
+ props.put("log.flush.interval", "1")
+ props.put("enable.zookeeper", "false")
+ props.put("num.partitions", "1")
+ props.put("log.retention.hours", "10")
+ props.put("log.cleanup.interval.mins", "5")
+ props.put("log.file.size", "1000")
+ props
+ }
+
+ private def getLogDir(): File = {
+ val dir = TestUtils.tempDir()
+ dir
+ }
+}
+
+class AppenderStringEncoder extends Encoder[LoggingEvent] {
+ def toMessage(event: LoggingEvent):Message = {
+ val logMessage = event.getMessage
+ new Message(logMessage.asInstanceOf[String].getBytes)
+ }
+}
+